419 lines
14 KiB
Python
419 lines
14 KiB
Python
import logging
|
||
import json
|
||
from flask import Flask, request, jsonify
|
||
import threading
|
||
import queue
|
||
import time
|
||
import argparse
|
||
import uuid
|
||
from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log
|
||
|
||
# 在导入其他模块之前先定义全局变量
|
||
app = Flask(__name__)
|
||
|
||
# 延迟导入,避免循环导入
|
||
try:
|
||
from ffmpeg_utils import process_ffmpeg
|
||
from sanjuuni_utils import process_sanjuuni
|
||
from video_frame_utils import process_video_frame_extraction
|
||
from file_cleanup import start_cleanup_thread
|
||
except ImportError as e:
|
||
logging.error(f"导入模块时出错: {e}")
|
||
# 定义空函数作为备用
|
||
def process_ffmpeg(*args, **kwargs):
|
||
return {'error': 'FFmpeg模块未正确导入'}
|
||
def process_sanjuuni(*args, **kwargs):
|
||
return {'error': 'Sanjuuni模块未正确导入'}
|
||
def process_video_frame_extraction(*args, **kwargs):
|
||
return {'error': '视频帧提取模块未正确导入'}
|
||
def start_cleanup_thread():
|
||
pass
|
||
|
||
# 输入参数列表
|
||
enter_parameter_table = {
|
||
"ffmpeg": {
|
||
"input_url": str,
|
||
"output_format": str,
|
||
"args": list
|
||
},
|
||
"sanjuuni": {
|
||
"input_url": str,
|
||
"output_format": str,
|
||
"args": list
|
||
},
|
||
"video_frame": {
|
||
"video_url": str,
|
||
"w": int,
|
||
"h": int,
|
||
"fps": int,
|
||
"force_resolution": bool,
|
||
"pad_to_target": bool
|
||
}
|
||
}
|
||
|
||
# 检测输入参数是否符合要求
|
||
def validate_request(data, api_name):
|
||
for key in enter_parameter_table[api_name]:
|
||
if key not in data and key != "subtitle":
|
||
logging.warning(f"请求中没有提供{key}参数")
|
||
return jsonify({'error': f"未提供{key}参数"}), 400
|
||
if key in data and isinstance(data[key], enter_parameter_table[api_name][key]) == False:
|
||
logging.warning(f"请求中{key}参数类型错误,应为{enter_parameter_table[api_name][key]}")
|
||
return jsonify({'error': f"{key}参数类型错误,您输入为{type(data[key])},应为{enter_parameter_table[api_name][key]}" }), 400
|
||
return None, None
|
||
|
||
def run_async_task(task_id, process_func, data):
|
||
"""运行异步任务"""
|
||
try:
|
||
with app.app_context():
|
||
# 更新任务状态为运行中
|
||
with task_lock:
|
||
task_registry[task_id]['status'] = 'running'
|
||
task_registry[task_id]['start_time'] = time.time()
|
||
|
||
# 执行处理函数
|
||
result = process_func(data, file_registry, file_lock, task_id, task_registry, task_lock)
|
||
|
||
# 更新任务状态
|
||
with task_lock:
|
||
if 'error' in result:
|
||
task_registry[task_id]['status'] = 'error'
|
||
task_registry[task_id]['error'] = result['error']
|
||
else:
|
||
task_registry[task_id]['status'] = 'completed'
|
||
task_registry[task_id]['result'] = result
|
||
task_registry[task_id]['end_time'] = time.time()
|
||
|
||
except Exception as e:
|
||
logging.error(f"任务 {task_id} 执行异常: {e}")
|
||
with task_lock:
|
||
task_registry[task_id]['status'] = 'error'
|
||
task_registry[task_id]['error'] = str(e)
|
||
task_registry[task_id]['end_time'] = time.time()
|
||
|
||
@app.route('/api/ffmpeg/async', methods=['POST'])
|
||
def ffmpeg_async_api():
|
||
"""创建异步FFmpeg任务"""
|
||
logging.info("收到异步FFmpeg API请求")
|
||
data = request.get_json()
|
||
|
||
# 检测参数类型
|
||
error_response, status_code = validate_request(data, "ffmpeg")
|
||
if error_response:
|
||
return error_response, status_code
|
||
|
||
# 生成任务ID
|
||
task_id = str(uuid.uuid4())[:8]
|
||
|
||
# 获取请求的主机信息用于构建URL
|
||
host = request.host
|
||
scheme = request.scheme
|
||
|
||
# 初始化任务状态
|
||
with task_lock:
|
||
task_registry[task_id] = {
|
||
'status': 'pending',
|
||
'type': 'ffmpeg',
|
||
'create_time': time.time(),
|
||
'logs': [],
|
||
'last_returned_index': 0, # 记录最后返回的日志索引
|
||
'progress': 0
|
||
}
|
||
|
||
# 启动异步任务
|
||
thread = threading.Thread(
|
||
target=run_async_task,
|
||
args=(task_id, process_ffmpeg, data)
|
||
)
|
||
thread.daemon = True
|
||
thread.start()
|
||
|
||
status_url = f"{scheme}://{host}/api/task/{task_id}"
|
||
|
||
logging.info(f"创建异步FFmpeg任务: {task_id}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'task_id': task_id,
|
||
'status_url': status_url,
|
||
'message': '任务已创建,请使用状态URL查询进度'
|
||
}), 202
|
||
|
||
@app.route('/api/sanjuuni/async', methods=['POST'])
|
||
def sanjuuni_async_api():
|
||
"""创建异步Sanjuuni任务"""
|
||
logging.info("收到异步Sanjuuni API请求")
|
||
data = request.get_json()
|
||
|
||
# 检测参数类型
|
||
error_response, status_code = validate_request(data, "sanjuuni")
|
||
if error_response:
|
||
return error_response, status_code
|
||
|
||
# 生成任务ID
|
||
task_id = str(uuid.uuid4())[:8]
|
||
|
||
# 获取请求的主机信息用于构建URL
|
||
host = request.host
|
||
scheme = request.scheme
|
||
|
||
# 初始化任务状态
|
||
with task_lock:
|
||
task_registry[task_id] = {
|
||
'status': 'pending',
|
||
'type': 'sanjuuni',
|
||
'create_time': time.time(),
|
||
'logs': [],
|
||
'last_returned_index': 0, # 记录最后返回的日志索引
|
||
'progress': 0
|
||
}
|
||
|
||
# 启动异步任务
|
||
thread = threading.Thread(
|
||
target=run_async_task,
|
||
args=(task_id, process_sanjuuni, data)
|
||
)
|
||
thread.daemon = True
|
||
thread.start()
|
||
|
||
status_url = f"{scheme}://{host}/api/task/{task_id}"
|
||
|
||
logging.info(f"创建异步Sanjuuni任务: {task_id}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'task_id': task_id,
|
||
'status_url': status_url,
|
||
'message': '任务已创建,请使用状态URL查询进度'
|
||
}), 202
|
||
|
||
@app.route('/api/video_frame/async', methods=['POST'])
|
||
def video_frame_async_api():
|
||
"""创建异步视频帧提取任务"""
|
||
logging.info("收到异步视频帧提取API请求")
|
||
data = request.get_json()
|
||
|
||
# 检测参数类型
|
||
error_response, status_code = validate_request(data, "video_frame")
|
||
if error_response:
|
||
return error_response, status_code
|
||
|
||
# 生成任务ID
|
||
task_id = str(uuid.uuid4())[:8]
|
||
|
||
# 获取请求的主机信息用于构建URL
|
||
host = request.host
|
||
scheme = request.scheme
|
||
|
||
# 初始化任务状态
|
||
with task_lock:
|
||
task_registry[task_id] = {
|
||
'status': 'pending',
|
||
'type': 'video_frame',
|
||
'create_time': time.time(),
|
||
'logs': [],
|
||
'last_returned_index': 0, # 记录最后返回的日志索引
|
||
'progress': 0
|
||
}
|
||
|
||
# 启动异步任务
|
||
thread = threading.Thread(
|
||
target=run_async_task,
|
||
args=(task_id, process_video_frame_extraction, data)
|
||
)
|
||
thread.daemon = True
|
||
thread.start()
|
||
|
||
status_url = f"{scheme}://{host}/api/task/{task_id}"
|
||
|
||
logging.info(f"创建异步视频帧提取任务: {task_id}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'task_id': task_id,
|
||
'status_url': status_url,
|
||
'message': '任务已创建,请使用状态URL查询进度'
|
||
}), 202
|
||
|
||
@app.route('/api/task/<task_id>', methods=['GET'])
|
||
def get_task_status(task_id):
|
||
"""查询任务状态和进度(自动返回新增日志)"""
|
||
with task_lock:
|
||
if task_id not in task_registry:
|
||
return jsonify({'error': '任务不存在'}), 404
|
||
|
||
task_info = task_registry[task_id].copy()
|
||
current_log_count = len(task_info['logs'])
|
||
last_returned_index = task_info['last_returned_index']
|
||
|
||
# 计算新增日志
|
||
if last_returned_index < current_log_count:
|
||
new_logs = task_info['logs'][last_returned_index:]
|
||
# 更新最后返回的日志索引
|
||
task_registry[task_id]['last_returned_index'] = current_log_count
|
||
else:
|
||
new_logs = []
|
||
|
||
# 构建响应
|
||
response = {
|
||
'task_id': task_id,
|
||
'status': task_info['status'],
|
||
'type': task_info['type'],
|
||
'create_time': task_info['create_time'],
|
||
'progress': task_info.get('progress', 0),
|
||
'total_logs': current_log_count,
|
||
'new_logs': new_logs,
|
||
'last_index': current_log_count # 返回当前日志总数,方便客户端跟踪
|
||
}
|
||
|
||
if 'start_time' in task_info:
|
||
response['start_time'] = task_info['start_time']
|
||
|
||
if 'end_time' in task_info:
|
||
response['end_time'] = task_info['end_time']
|
||
|
||
# 根据状态返回不同信息
|
||
if task_info['status'] == 'completed':
|
||
response['result'] = task_info['result']
|
||
elif task_info['status'] == 'error':
|
||
response['error'] = task_info['error']
|
||
|
||
return jsonify(response), 200
|
||
|
||
# 原有的同步接口保持不变
|
||
@app.route('/api/ffmpeg', methods=['POST'])
|
||
def ffmpeg_api():
|
||
logging.info("收到FFmpeg API请求")
|
||
data = request.get_json()
|
||
|
||
# 检测参数类型
|
||
error_response, status_code = validate_request(data, "ffmpeg")
|
||
if error_response:
|
||
assert status_code is not None
|
||
return error_response, status_code
|
||
|
||
# 创建处理进程
|
||
result_queue = queue.Queue()
|
||
def run_process(data, file_registry, file_lock, result_queue):
|
||
with app.app_context():
|
||
result = process_ffmpeg(data, file_registry, file_lock)
|
||
result_queue.put(result)
|
||
|
||
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
|
||
thread.start()
|
||
|
||
# 等待处理结果
|
||
result = result_queue.get()
|
||
if 'error' in result:
|
||
logging.error(f"处理过程中出错: {result['error']}")
|
||
return jsonify({'status': 'error', 'error': result['error']}), 500
|
||
else:
|
||
logging.info(f"处理成功,返回下载URL: {result['download_url']}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'download_url': result['download_url'],
|
||
'file_id': result['file_id']
|
||
}), 200
|
||
|
||
@app.route('/api/sanjuuni', methods=['POST'])
|
||
def sanjuuni_api():
|
||
logging.info("收到Sanjuuni API请求")
|
||
data = request.get_json()
|
||
|
||
# 检测参数类型
|
||
error_response, status_code = validate_request(data, "sanjuuni")
|
||
if error_response:
|
||
assert status_code is not None
|
||
return error_response, status_code
|
||
|
||
# 创建处理进程
|
||
result_queue = queue.Queue()
|
||
def run_process(data, file_registry, file_lock, result_queue):
|
||
with app.app_context():
|
||
result = process_sanjuuni(data, file_registry, file_lock)
|
||
result_queue.put(result)
|
||
|
||
thread = threading.Thread(target=run_process, args=(data, file_registry, file_lock, result_queue))
|
||
thread.start()
|
||
|
||
# 等待处理结果
|
||
result = result_queue.get()
|
||
if 'error' in result:
|
||
logging.error(f"处理过程中出错: {result['error']}")
|
||
return jsonify({'status': 'error', 'error': result['error']}), 500
|
||
else:
|
||
logging.info(f"处理成功,返回下载URL: {result['download_url']}")
|
||
return jsonify({
|
||
'status': 'success',
|
||
'download_url': result['download_url'],
|
||
'file_id': result['file_id']
|
||
}), 200
|
||
|
||
@app.route('/download/<file_id>/<filename>', methods=['GET'])
|
||
def download_file_endpoint(file_id, filename):
|
||
logging.info(f"收到文件下载请求 - 文件ID: {file_id}, 文件名: {filename}")
|
||
with file_lock:
|
||
if file_id not in file_registry:
|
||
logging.warning(f"文件ID: {file_id} 不存在")
|
||
return jsonify({'error': '文件不存在'}), 404
|
||
file_info = file_registry[file_id]
|
||
file_info['last_access'] = time.time()
|
||
file_info['download_count'] += 1
|
||
|
||
try:
|
||
with open(file_info['path'], 'rb') as f:
|
||
file_data = f.read()
|
||
return file_data, 200, {'Content-Disposition': f'attachment; filename={filename}'}
|
||
except Exception as e:
|
||
logging.error(f"下载文件时出错: {e}")
|
||
return jsonify({'status': 'error', 'error': str(e)}), 500
|
||
|
||
@app.route('/frames/<job_id>/<filename>', methods=['GET'])
|
||
def serve_video_frames(job_id, filename):
|
||
"""提供视频帧和音频文件访问"""
|
||
import os
|
||
from flask import send_from_directory
|
||
from video_frame_utils import FRAMES_ROOT
|
||
|
||
safe_job = os.path.basename(job_id)
|
||
safe_file = os.path.basename(filename)
|
||
dir_path = os.path.join(FRAMES_ROOT, safe_job)
|
||
|
||
if not os.path.isdir(dir_path):
|
||
return jsonify({"error": "文件不存在"}), 404
|
||
|
||
allowed = safe_file.endswith(('.png', '.dfpwm'))
|
||
if not allowed:
|
||
return jsonify({"error": "文件类型不支持"}), 400
|
||
|
||
try:
|
||
return send_from_directory(dir_path, safe_file)
|
||
except Exception as e:
|
||
logging.error(f"提供文件时出错: {e}")
|
||
return jsonify({"error": "文件访问失败"}), 500
|
||
|
||
@app.route('/health', methods=['GET'])
|
||
def health_check():
|
||
"""健康检查接口,直接返回 'ok'"""
|
||
return "OK"
|
||
|
||
@app.route('/api/tasks', methods=['GET'])
|
||
def list_tasks():
|
||
"""列出所有任务(用于调试)"""
|
||
with task_lock:
|
||
tasks = {}
|
||
for task_id, task_info in task_registry.items():
|
||
tasks[task_id] = {
|
||
'status': task_info['status'],
|
||
'type': task_info['type'],
|
||
'create_time': task_info['create_time'],
|
||
'progress': task_info.get('progress', 0)
|
||
}
|
||
return jsonify(tasks), 200
|
||
|
||
if __name__ == '__main__':
|
||
# 配置命令行参数解析
|
||
parser = argparse.ArgumentParser(description='启动FFmpeg处理服务')
|
||
parser.add_argument('--port', type=int, default=5000, help='服务监听的端口号(默认: 5000)')
|
||
args = parser.parse_args()
|
||
|
||
logging.info(f"启动应用程序,端口: {args.port}...")
|
||
start_cleanup_thread()
|
||
app.run(host='0.0.0.0', port=args.port, threaded=True) |