Files
GMapiServer/main.py
2025-11-20 20:54:15 +08:00

337 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import logging
import json
from flask import Flask, request, jsonify
import threading
import queue
import time
import argparse
import uuid
from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log
# 在导入其他模块之前先定义全局变量
app = Flask(__name__)
# 延迟导入,避免循环导入
try:
from ffmpeg_utils import process_ffmpeg
from sanjuuni_utils import process_sanjuuni
from file_cleanup import start_cleanup_thread
except ImportError as e:
logging.error(f"导入模块时出错: {e}")
# 定义空函数作为备用
def process_ffmpeg(*args, **kwargs):
return {'error': 'FFmpeg模块未正确导入'}
def process_sanjuuni(*args, **kwargs):
return {'error': 'Sanjuuni模块未正确导入'}
def start_cleanup_thread():
pass
# 输入参数列表
enter_parameter_table = {
"ffmpeg": {
"input_url": str,
"output_format": str,
"args": list
},
"sanjuuni": {
"input_url": str,
"output_format": str,
"args": list
}
}
# 检测输入参数是否符合要求
def validate_request(data, api_name):
for key in enter_parameter_table[api_name]:
if key not in data and key != "subtitle":
logging.warning(f"请求中没有提供{key}参数")
return jsonify({'error': f"未提供{key}参数"}), 400
if key in data and isinstance(data[key], enter_parameter_table[api_name][key]) == False:
logging.warning(f"请求中{key}参数类型错误,应为{enter_parameter_table[api_name][key]}")
return jsonify({'error': f"{key}参数类型错误,您输入为{type(data[key])},应为{enter_parameter_table[api_name][key]}" }), 400
return None, None
def run_async_task(task_id, process_func, data):
"""运行异步任务"""
try:
with app.app_context():
# 更新任务状态为运行中
with task_lock:
task_registry[task_id]['status'] = 'running'
task_registry[task_id]['start_time'] = time.time()
# 执行处理函数
result = process_func(data, file_registry, file_lock, task_id, task_registry, task_lock)
# 更新任务状态
with task_lock:
if 'error' in result:
task_registry[task_id]['status'] = 'error'
task_registry[task_id]['error'] = result['error']
else:
task_registry[task_id]['status'] = 'completed'
task_registry[task_id]['result'] = result
task_registry[task_id]['end_time'] = time.time()
except Exception as e:
logging.error(f"任务 {task_id} 执行异常: {e}")
with task_lock:
task_registry[task_id]['status'] = 'error'
task_registry[task_id]['error'] = str(e)
task_registry[task_id]['end_time'] = time.time()
@app.route('/api/ffmpeg/async', methods=['POST'])
def ffmpeg_async_api():
"""创建异步FFmpeg任务"""
logging.info("收到异步FFmpeg API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "ffmpeg")
if error_response:
return error_response, status_code
# 生成任务ID
task_id = str(uuid.uuid4())[:8]
# 获取请求的主机信息用于构建URL
host = request.host
scheme = request.scheme
# 初始化任务状态
with task_lock:
task_registry[task_id] = {
'status': 'pending',
'type': 'ffmpeg',
'create_time': time.time(),
'logs': [],
'last_returned_index': 0, # 记录最后返回的日志索引
'progress': 0
}
# 启动异步任务
thread = threading.Thread(
target=run_async_task,
args=(task_id, process_ffmpeg, data)
)
thread.daemon = True
thread.start()
status_url = f"{scheme}://{host}/api/task/{task_id}"
logging.info(f"创建异步FFmpeg任务: {task_id}")
return jsonify({
'status': 'success',
'task_id': task_id,
'status_url': status_url,
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/sanjuuni/async', methods=['POST'])
def sanjuuni_async_api():
"""创建异步Sanjuuni任务"""
logging.info("收到异步Sanjuuni API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "sanjuuni")
if error_response:
return error_response, status_code
# 生成任务ID
task_id = str(uuid.uuid4())[:8]
# 获取请求的主机信息用于构建URL
host = request.host
scheme = request.scheme
# 初始化任务状态
with task_lock:
task_registry[task_id] = {
'status': 'pending',
'type': 'sanjuuni',
'create_time': time.time(),
'logs': [],
'last_returned_index': 0, # 记录最后返回的日志索引
'progress': 0
}
# 启动异步任务
thread = threading.Thread(
target=run_async_task,
args=(task_id, process_sanjuuni, data)
)
thread.daemon = True
thread.start()
status_url = f"{scheme}://{host}/api/task/{task_id}"
logging.info(f"创建异步Sanjuuni任务: {task_id}")
return jsonify({
'status': 'success',
'task_id': task_id,
'status_url': status_url,
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""查询任务状态和进度(自动返回新增日志)"""
with task_lock:
if task_id not in task_registry:
return jsonify({'error': '任务不存在'}), 404
task_info = task_registry[task_id].copy()
current_log_count = len(task_info['logs'])
last_returned_index = task_info['last_returned_index']
# 计算新增日志
if last_returned_index < current_log_count:
new_logs = task_info['logs'][last_returned_index:]
# 更新最后返回的日志索引
task_registry[task_id]['last_returned_index'] = current_log_count
else:
new_logs = []
# 构建响应
response = {
'task_id': task_id,
'status': task_info['status'],
'type': task_info['type'],
'create_time': task_info['create_time'],
'progress': task_info.get('progress', 0),
'total_logs': current_log_count,
'new_logs': new_logs,
'last_index': current_log_count # 返回当前日志总数,方便客户端跟踪
}
if 'start_time' in task_info:
response['start_time'] = task_info['start_time']
if 'end_time' in task_info:
response['end_time'] = task_info['end_time']
# 根据状态返回不同信息
if task_info['status'] == 'completed':
response['result'] = task_info['result']
elif task_info['status'] == 'error':
response['error'] = task_info['error']
return jsonify(response), 200
# 原有的同步接口保持不变
@app.route('/api/ffmpeg', methods=['POST'])
def ffmpeg_api():
logging.info("收到FFmpeg API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "ffmpeg")
if error_response:
assert status_code is not None
return error_response, status_code
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context():
result = process_ffmpeg(data, file_registry, file_lock)
result_queue.put(result)
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
thread.start()
# 等待处理结果
result = result_queue.get()
if 'error' in result:
logging.error(f"处理过程中出错: {result['error']}")
return jsonify({'status': 'error', 'error': result['error']}), 500
else:
logging.info(f"处理成功返回下载URL: {result['download_url']}")
return jsonify({
'status': 'success',
'download_url': result['download_url'],
'file_id': result['file_id']
}), 200
@app.route('/api/sanjuuni', methods=['POST'])
def sanjuuni_api():
logging.info("收到Sanjuuni API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "sanjuuni")
if error_response:
assert status_code is not None
return error_response, status_code
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context():
result = process_sanjuuni(data, file_registry, file_lock)
result_queue.put(result)
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
thread.start()
# 等待处理结果
result = result_queue.get()
if 'error' in result:
logging.error(f"处理过程中出错: {result['error']}")
return jsonify({'status': 'error', 'error': result['error']}), 500
else:
logging.info(f"处理成功返回下载URL: {result['download_url']}")
return jsonify({
'status': 'success',
'download_url': result['download_url'],
'file_id': result['file_id']
}), 200
@app.route('/download/<file_id>/<filename>', methods=['GET'])
def download_file_endpoint(file_id, filename):
logging.info(f"收到文件下载请求 - 文件ID: {file_id}, 文件名: {filename}")
with file_lock:
if file_id not in file_registry:
logging.warning(f"文件ID: {file_id} 不存在")
return jsonify({'error': '文件不存在'}), 404
file_info = file_registry[file_id]
file_info['last_access'] = time.time()
file_info['download_count'] += 1
try:
with open(file_info['path'], 'rb') as f:
file_data = f.read()
return file_data, 200, {'Content-Disposition': f'attachment; filename={filename}'}
except Exception as e:
logging.error(f"下载文件时出错: {e}")
return jsonify({'status': 'error', 'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查接口,直接返回 'ok'"""
return "OK"
@app.route('/api/tasks', methods=['GET'])
def list_tasks():
"""列出所有任务(用于调试)"""
with task_lock:
tasks = {}
for task_id, task_info in task_registry.items():
tasks[task_id] = {
'status': task_info['status'],
'type': task_info['type'],
'create_time': task_info['create_time'],
'progress': task_info.get('progress', 0)
}
return jsonify(tasks), 200
if __name__ == '__main__':
# 配置命令行参数解析
parser = argparse.ArgumentParser(description='启动FFmpeg处理服务')
parser.add_argument('--port', type=int, default=5000, help='服务监听的端口号(默认: 5000')
args = parser.parse_args()
logging.info(f"启动应用程序,端口: {args.port}...")
start_cleanup_thread()
app.run(host='0.0.0.0', port=args.port, threaded=True)