GMapiServer/main.py
2025-05-04 17:22:21 +08:00

128 lines
4.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
from flask import Flask, request, jsonify
import threading
import queue
import time
from ffmpeg_utils import process_ffmpeg
from sanjuuni_utils import process_sanjuuni
from file_cleanup import start_cleanup_thread
app = Flask(__name__)
# 假设这些是主文件中已定义的变量
file_registry = {}
file_lock = threading.Lock()
# 输入参数列表
enter_parameter_table = {
"ffmpeg": {
"input_url": str,
"output_format": str,
"args": list
},
"sanjuuni": {
"input_url": str,
"output_format": str,
"args": list
}
}
# 检测输入参数是否符合要求
def validate_request(data, api_name):
for key in enter_parameter_table[api_name]:
if key not in data and key != "subtitle":
logging.warning(f"请求中没有提供{key}参数")
return jsonify({'error': f"未提供{key}参数"}), 400
if key in data and isinstance(data[key], enter_parameter_table[api_name][key]) == False:
logging.warning(f"请求中{key}参数类型错误,应为{enter_parameter_table[api_name][key]}")
return jsonify({'error': f"{key}参数类型错误,您输入为{type(data[key])},应为{enter_parameter_table[api_name][key]}" }), 400
return None, None
@app.route('/api/ffmpeg', methods=['POST'])
def ffmpeg_api():
logging.info("收到FFmpeg API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "ffmpeg")
if error_response: return error_response, status_code
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context(): # 设置应用上下文
result = process_ffmpeg(data, file_registry, file_lock)
result_queue.put(result)
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
thread.start()
# 等待处理结果
result = result_queue.get()
if 'error' in result:
logging.error(f"处理过程中出错: {result['error']}")
return jsonify({'status': 'error', 'error': result['error']}), 500
else:
logging.info(f"处理成功返回下载URL: {result['download_url']}")
return jsonify({
'status': 'success',
'download_url': result['download_url'],
'file_id': result['file_id']
}), 200
@app.route('/api/sanjuuni', methods=['POST'])
def sanjuuni_api():
logging.info("收到Sanjuuni API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "sanjuuni")
if error_response: return error_response, status_code
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context(): # 设置应用上下文
result = process_sanjuuni(data, file_registry, file_lock)
result_queue.put(result)
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
thread.start()
# 等待处理结果
result = result_queue.get()
if 'error' in result:
logging.error(f"处理过程中出错: {result['error']}")
return jsonify({'status': 'error', 'error': result['error']}), 500
else:
logging.info(f"处理成功返回下载URL: {result['download_url']}")
return jsonify({
'status': 'success',
'download_url': result['download_url'],
'file_id': result['file_id']
}), 200
@app.route('/download/<file_id>/<filename>', methods=['GET'])
def download_file_endpoint(file_id, filename):
logging.info(f"收到文件下载请求 - 文件ID: {file_id}, 文件名: {filename}")
with file_lock:
if file_id not in file_registry:
logging.warning(f"文件ID: {file_id} 不存在")
return jsonify({'error': '文件不存在'}), 404
file_info = file_registry[file_id]
file_info['last_access'] = time.time()
file_info['download_count'] += 1
try:
with open(file_info['path'], 'rb') as f:
file_data = f.read()
return file_data, 200, {'Content-Disposition': f'attachment; filename={filename}'}
except Exception as e:
logging.error(f"下载文件时出错: {e}")
return jsonify({'status': 'error', 'error': str(e)}), 500
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
start_cleanup_thread()
app.run(debug=True)