import logging import json from flask import Flask, request, jsonify import threading import queue import time import argparse import uuid from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log # 在导入其他模块之前先定义全局变量 app = Flask(__name__) # 延迟导入,避免循环导入 try: from ffmpeg_utils import process_ffmpeg from sanjuuni_utils import process_sanjuuni from video_frame_utils import process_video_frame_extraction from file_cleanup import start_cleanup_thread except ImportError as e: logging.error(f"导入模块时出错: {e}") # 定义空函数作为备用 def process_ffmpeg(*args, **kwargs): return {'error': 'FFmpeg模块未正确导入'} def process_sanjuuni(*args, **kwargs): return {'error': 'Sanjuuni模块未正确导入'} def process_video_frame_extraction(*args, **kwargs): return {'error': '视频帧提取模块未正确导入'} def start_cleanup_thread(): pass # 输入参数列表 enter_parameter_table = { "ffmpeg": { "input_url": str, "output_format": str, "args": list }, "sanjuuni": { "input_url": str, "output_format": str, "args": list }, "video_frame": { "video_url": str, "w": int, "h": int, "fps": int, "force_resolution": bool, "pad_to_target": bool } } # 检测输入参数是否符合要求 def validate_request(data, api_name): for key in enter_parameter_table[api_name]: if key not in data and key != "subtitle": logging.warning(f"请求中没有提供{key}参数") return jsonify({'error': f"未提供{key}参数"}), 400 if key in data and isinstance(data[key], enter_parameter_table[api_name][key]) == False: logging.warning(f"请求中{key}参数类型错误,应为{enter_parameter_table[api_name][key]}") return jsonify({'error': f"{key}参数类型错误,您输入为{type(data[key])},应为{enter_parameter_table[api_name][key]}" }), 400 return None, None def run_async_task(task_id, process_func, data): """运行异步任务""" try: with app.app_context(): # 更新任务状态为运行中 with task_lock: task_registry[task_id]['status'] = 'running' task_registry[task_id]['start_time'] = time.time() # 执行处理函数 result = process_func(data, file_registry, file_lock, task_id, task_registry, task_lock) # 更新任务状态 with task_lock: if 'error' in result: task_registry[task_id]['status'] = 'error' task_registry[task_id]['error'] = result['error'] else: task_registry[task_id]['status'] = 'completed' task_registry[task_id]['result'] = result task_registry[task_id]['end_time'] = time.time() except Exception as e: logging.error(f"任务 {task_id} 执行异常: {e}") with task_lock: task_registry[task_id]['status'] = 'error' task_registry[task_id]['error'] = str(e) task_registry[task_id]['end_time'] = time.time() @app.route('/api/ffmpeg/async', methods=['POST']) def ffmpeg_async_api(): """创建异步FFmpeg任务""" logging.info("收到异步FFmpeg API请求") data = request.get_json() # 检测参数类型 error_response, status_code = validate_request(data, "ffmpeg") if error_response: return error_response, status_code # 生成任务ID task_id = str(uuid.uuid4())[:8] # 获取请求的主机信息用于构建URL host = request.host scheme = request.scheme # 初始化任务状态 with task_lock: task_registry[task_id] = { 'status': 'pending', 'type': 'ffmpeg', 'create_time': time.time(), 'logs': [], 'last_returned_index': 0, # 记录最后返回的日志索引 'progress': 0 } # 启动异步任务 thread = threading.Thread( target=run_async_task, args=(task_id, process_ffmpeg, data) ) thread.daemon = True thread.start() status_url = f"{scheme}://{host}/api/task/{task_id}" logging.info(f"创建异步FFmpeg任务: {task_id}") return jsonify({ 'status': 'success', 'task_id': task_id, 'status_url': status_url, 'message': '任务已创建,请使用状态URL查询进度' }), 202 @app.route('/api/sanjuuni/async', methods=['POST']) def sanjuuni_async_api(): """创建异步Sanjuuni任务""" logging.info("收到异步Sanjuuni API请求") data = request.get_json() # 检测参数类型 error_response, status_code = validate_request(data, "sanjuuni") if error_response: return error_response, status_code # 生成任务ID task_id = str(uuid.uuid4())[:8] # 获取请求的主机信息用于构建URL host = request.host scheme = request.scheme # 初始化任务状态 with task_lock: task_registry[task_id] = { 'status': 'pending', 'type': 'sanjuuni', 'create_time': time.time(), 'logs': [], 'last_returned_index': 0, # 记录最后返回的日志索引 'progress': 0 } # 启动异步任务 thread = threading.Thread( target=run_async_task, args=(task_id, process_sanjuuni, data) ) thread.daemon = True thread.start() status_url = f"{scheme}://{host}/api/task/{task_id}" logging.info(f"创建异步Sanjuuni任务: {task_id}") return jsonify({ 'status': 'success', 'task_id': task_id, 'status_url': status_url, 'message': '任务已创建,请使用状态URL查询进度' }), 202 @app.route('/api/video_frame/async', methods=['POST']) def video_frame_async_api(): """创建异步视频帧提取任务""" logging.info("收到异步视频帧提取API请求") data = request.get_json() # 检测参数类型 error_response, status_code = validate_request(data, "video_frame") if error_response: return error_response, status_code # 生成任务ID task_id = str(uuid.uuid4())[:8] # 获取请求的主机信息用于构建URL host = request.host scheme = request.scheme # 初始化任务状态 with task_lock: task_registry[task_id] = { 'status': 'pending', 'type': 'video_frame', 'create_time': time.time(), 'logs': [], 'last_returned_index': 0, # 记录最后返回的日志索引 'progress': 0 } # 启动异步任务 thread = threading.Thread( target=run_async_task, args=(task_id, process_video_frame_extraction, data) ) thread.daemon = True thread.start() status_url = f"{scheme}://{host}/api/task/{task_id}" logging.info(f"创建异步视频帧提取任务: {task_id}") return jsonify({ 'status': 'success', 'task_id': task_id, 'status_url': status_url, 'message': '任务已创建,请使用状态URL查询进度' }), 202 @app.route('/api/task/', methods=['GET']) def get_task_status(task_id): """查询任务状态和进度(自动返回新增日志)""" with task_lock: if task_id not in task_registry: return jsonify({'error': '任务不存在'}), 404 task_info = task_registry[task_id].copy() current_log_count = len(task_info['logs']) last_returned_index = task_info['last_returned_index'] # 计算新增日志 if last_returned_index < current_log_count: new_logs = task_info['logs'][last_returned_index:] # 更新最后返回的日志索引 task_registry[task_id]['last_returned_index'] = current_log_count else: new_logs = [] # 构建响应 response = { 'task_id': task_id, 'status': task_info['status'], 'type': task_info['type'], 'create_time': task_info['create_time'], 'progress': task_info.get('progress', 0), 'total_logs': current_log_count, 'new_logs': new_logs, 'last_index': current_log_count # 返回当前日志总数,方便客户端跟踪 } if 'start_time' in task_info: response['start_time'] = task_info['start_time'] if 'end_time' in task_info: response['end_time'] = task_info['end_time'] # 根据状态返回不同信息 if task_info['status'] == 'completed': response['result'] = task_info['result'] elif task_info['status'] == 'error': response['error'] = task_info['error'] return jsonify(response), 200 # 原有的同步接口保持不变 @app.route('/api/ffmpeg', methods=['POST']) def ffmpeg_api(): logging.info("收到FFmpeg API请求") data = request.get_json() # 检测参数类型 error_response, status_code = validate_request(data, "ffmpeg") if error_response: assert status_code is not None return error_response, status_code # 创建处理进程 result_queue = queue.Queue() def run_process(data, file_registry, file_lock, result_queue): with app.app_context(): result = process_ffmpeg(data, file_registry, file_lock) result_queue.put(result) thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue)) thread.start() # 等待处理结果 result = result_queue.get() if 'error' in result: logging.error(f"处理过程中出错: {result['error']}") return jsonify({'status': 'error', 'error': result['error']}), 500 else: logging.info(f"处理成功,返回下载URL: {result['download_url']}") return jsonify({ 'status': 'success', 'download_url': result['download_url'], 'file_id': result['file_id'] }), 200 @app.route('/api/sanjuuni', methods=['POST']) def sanjuuni_api(): logging.info("收到Sanjuuni API请求") data = request.get_json() # 检测参数类型 error_response, status_code = validate_request(data, "sanjuuni") if error_response: assert status_code is not None return error_response, status_code # 创建处理进程 result_queue = queue.Queue() def run_process(data, file_registry, file_lock, result_queue): with app.app_context(): result = process_sanjuuni(data, file_registry, file_lock) result_queue.put(result) thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue)) thread.start() # 等待处理结果 result = result_queue.get() if 'error' in result: logging.error(f"处理过程中出错: {result['error']}") return jsonify({'status': 'error', 'error': result['error']}), 500 else: logging.info(f"处理成功,返回下载URL: {result['download_url']}") return jsonify({ 'status': 'success', 'download_url': result['download_url'], 'file_id': result['file_id'] }), 200 @app.route('/download//', methods=['GET']) def download_file_endpoint(file_id, filename): logging.info(f"收到文件下载请求 - 文件ID: {file_id}, 文件名: {filename}") with file_lock: if file_id not in file_registry: logging.warning(f"文件ID: {file_id} 不存在") return jsonify({'error': '文件不存在'}), 404 file_info = file_registry[file_id] file_info['last_access'] = time.time() file_info['download_count'] += 1 try: with open(file_info['path'], 'rb') as f: file_data = f.read() return file_data, 200, {'Content-Disposition': f'attachment; filename={filename}'} except Exception as e: logging.error(f"下载文件时出错: {e}") return jsonify({'status': 'error', 'error': str(e)}), 500 @app.route('/frames//', methods=['GET']) def serve_video_frames(job_id, filename): """提供视频帧和音频文件访问""" import os from flask import send_from_directory from video_frame_utils import FRAMES_ROOT safe_job = os.path.basename(job_id) safe_file = os.path.basename(filename) dir_path = os.path.join(FRAMES_ROOT, safe_job) if not os.path.isdir(dir_path): return jsonify({"error": "文件不存在"}), 404 allowed = safe_file.endswith(('.png', '.dfpwm')) if not allowed: return jsonify({"error": "文件类型不支持"}), 400 try: return send_from_directory(dir_path, safe_file) except Exception as e: logging.error(f"提供文件时出错: {e}") return jsonify({"error": "文件访问失败"}), 500 @app.route('/health', methods=['GET']) def health_check(): """健康检查接口,直接返回 'ok'""" return "OK" @app.route('/api/tasks', methods=['GET']) def list_tasks(): """列出所有任务(用于调试)""" with task_lock: tasks = {} for task_id, task_info in task_registry.items(): tasks[task_id] = { 'status': task_info['status'], 'type': task_info['type'], 'create_time': task_info['create_time'], 'progress': task_info.get('progress', 0) } return jsonify(tasks), 200 if __name__ == '__main__': # 配置命令行参数解析 parser = argparse.ArgumentParser(description='启动FFmpeg处理服务') parser.add_argument('--port', type=int, default=5000, help='服务监听的端口号(默认: 5000)') args = parser.parse_args() logging.info(f"启动应用程序,端口: {args.port}...") start_cleanup_thread() app.run(host='0.0.0.0', port=args.port, threaded=True)