添加创建请求和读取进度接口

This commit is contained in:
HKXluo
2025-11-20 20:54:15 +08:00
parent eede77a848
commit 0e3f229260
7 changed files with 404 additions and 55 deletions

1
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.history
__pycache__/
app.log
temp_files

View File

@@ -37,6 +37,10 @@
- **功能**: 在线调用 [Sanjuuni 工具](https://github.com/MCJack123/sanjuuni/tree/master)(具体功能需参考其官方文档)。
- **接口文档**: [SanjuuniApi 文档](https://www.liulikeji.cn/archives/SanjuuniApi)
### 新增异步接口调用
- **接口文档**: [异步任务处理接口文档](https://www.liulikeji.cn/archives/wei-ming-ming-wen-zhang)
---
## 📦 部署与使用

View File

@@ -8,6 +8,7 @@ import logging
import threading
import time
from contextlib import contextmanager
from shared_utils import add_task_log
UPLOAD_FOLDER = 'temp_files'
@@ -17,26 +18,39 @@ def temp_directory(dir=None):
try:
yield temp_dir
finally:
# 不再在这里自动删除临时目录
pass
def download_file(url, temp_dir):
def download_file(url, temp_dir, task_id=None, task_registry=None, task_lock=None):
try:
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': '*/*'
}
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始从URL下载文件: {url}", task_registry, task_lock)
logging.info(f"开始从URL下载文件: {url}")
response = requests.get(url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
file_path = os.path.join(temp_dir, 'input_audio')
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
with open(file_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
downloaded_size += len(chunk)
# 更新下载进度
if total_size > 0 and task_id and task_registry and task_lock:
progress = min(50, int((downloaded_size / total_size) * 50)) # 下载占50%进度
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = progress
file_size = os.path.getsize(file_path)
if file_size < 1024:
@@ -44,56 +58,104 @@ def download_file(url, temp_dir):
logging.error(error_msg)
raise Exception(error_msg)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)", task_registry, task_lock)
logging.info(f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)")
return file_path
except Exception as e:
logging.error(f"{url} 下载文件时出错: {e}")
error_msg = f"{url} 下载文件时出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise
def execute_ffmpeg(input_path, output_path, ffmpeg_args):
def execute_ffmpeg(input_path, output_path, ffmpeg_args, task_id=None, task_registry=None, task_lock=None):
try:
filtered_args = [arg for arg in ffmpeg_args if not (arg.lower() in ['-i', '-input'] or
(ffmpeg_args.index(arg) > 0 and ffmpeg_args[ffmpeg_args.index(arg) - 1].lower() == '-i'))]
cmd = ['lib/ffmpeg/bin/ffmpeg', '-i', input_path] + filtered_args + [output_path]
if task_id and task_registry and task_lock:
add_task_log(task_id, f"执行FFmpeg命令: {' '.join(cmd)}", task_registry, task_lock)
logging.info(f"执行FFmpeg命令: {' '.join(cmd)}")
result = subprocess.run(
# 使用Popen来实时捕获输出
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.STDOUT, # 将stderr重定向到stdout
universal_newlines=True,
encoding='utf-8',
errors='replace'
errors='replace',
bufsize=1
)
if result.returncode != 0:
error_msg = f"FFmpeg处理失败返回码: {result.returncode}, 错误: {result.stderr}"
# 实时读取输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
output = output.strip()
if output and task_id and task_registry and task_lock:
add_task_log(task_id, output, task_registry, task_lock)
# 简单的进度解析可以根据FFmpeg的输出格式进行优化
if 'time=' in output and 'bitrate=' in output:
# 这里可以添加更复杂的进度解析逻辑
if task_id and task_registry and task_lock:
with task_lock:
if task_id in task_registry:
# 处理进度从50%开始到100%
task_registry[task_id]['progress'] = 50 + 50 // 2 # 简单示例
returncode = process.poll()
if returncode != 0:
error_msg = f"FFmpeg处理失败返回码: {returncode}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise Exception(error_msg)
if task_id and task_registry and task_lock:
add_task_log(task_id, "FFmpeg处理成功完成", task_registry, task_lock)
logging.info("FFmpeg处理成功完成")
return True
except Exception as e:
logging.error(f"执行FFmpeg时出错: {e}")
error_msg = f"执行FFmpeg时出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise
def process_ffmpeg(data, file_registry, file_lock):
def process_ffmpeg(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None):
try:
temp_dir = tempfile.mkdtemp(dir=UPLOAD_FOLDER)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"创建临时目录: {temp_dir}", task_registry, task_lock)
logging.info(f"创建临时目录: {temp_dir}")
input_url = data.get('input_url')
ffmpeg_args = data.get('args', [])
input_path = download_file(input_url, temp_dir)
input_path = download_file(input_url, temp_dir, task_id, task_registry, task_lock)
output_id = str(uuid.uuid4())[:8]
output_format = data.get('output_format', 'mp4')
output_filename = f"{output_id}.{output_format}"
output_path = os.path.join(temp_dir, output_filename)
execute_ffmpeg(input_path, output_path, ffmpeg_args)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始FFmpeg处理输出格式: {output_format}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 50 # 开始处理进度50%
execute_ffmpeg(input_path, output_path, ffmpeg_args, task_id, task_registry, task_lock)
with file_lock:
file_registry[output_id] = {
@@ -102,16 +164,26 @@ def process_ffmpeg(data, file_registry, file_lock):
'last_access': time.time(),
'download_count': 0
}
logging.info(f"已注册新文件ID: {output_id}, 路径: {output_path}")
if task_id and task_registry and task_lock:
add_task_log(task_id, f"已注册新文件ID: {output_id}, 路径: {output_path}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 100 # 处理完成进度100%
logging.info(f"已注册新文件ID: {output_id}, 路径: {output_path}")
# 返回临时目录路径以便在主函数中删除
return {
'status': 'success',
'download_url': f"http://ffmpeg.liulikeji.cn/download/{output_id}/{output_filename}",
'file_id': output_id,
'temp_dir': temp_dir # 返回临时目录路径
'temp_dir': temp_dir
}
except Exception as e:
logging.error(f"处理过程中出错: {e}")
return {'error': str(e)}
error_msg = f"处理过程中出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
return {'error': str(e)}

View File

@@ -3,14 +3,12 @@ import shutil
import threading
import time
import logging
from shared_utils import file_registry, file_lock, task_registry, task_lock
UPLOAD_FOLDER = 'temp_files'
CLEANUP_INTERVAL = 3600 # 清理临时文件的间隔(秒)
FILE_EXPIRY = 7200 # 文件过期时间(秒)
file_registry = {}
file_lock = threading.Lock()
def cleanup_temp_files():
current_time = time.time()
expired_folders = []
@@ -38,6 +36,17 @@ def cleanup_temp_files():
logging.error(f"删除文件夹 {folder_path} 失败: {e}")
logging.info(f"清理完成,共删除 {len(expired_folders)} 个过期文件夹")
# 同时清理过期的任务记录超过2小时
with task_lock:
expired_tasks = []
for task_id, task_info in list(task_registry.items()):
if current_time - task_info['create_time'] > FILE_EXPIRY:
expired_tasks.append(task_id)
for task_id in expired_tasks:
del task_registry[task_id]
logging.info(f"已清理过期任务记录: {task_id}")
def start_cleanup_thread():
def cleanup_loop():

213
main.py
View File

@@ -5,15 +5,26 @@ import threading
import queue
import time
import argparse
from ffmpeg_utils import process_ffmpeg
from sanjuuni_utils import process_sanjuuni
from file_cleanup import start_cleanup_thread
import uuid
from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log
# 在导入其他模块之前先定义全局变量
app = Flask(__name__)
# 假设这些是主文件中已定义的变量
file_registry = {}
file_lock = threading.Lock()
# 延迟导入,避免循环导入
try:
from ffmpeg_utils import process_ffmpeg
from sanjuuni_utils import process_sanjuuni
from file_cleanup import start_cleanup_thread
except ImportError as e:
logging.error(f"导入模块时出错: {e}")
# 定义空函数作为备用
def process_ffmpeg(*args, **kwargs):
return {'error': 'FFmpeg模块未正确导入'}
def process_sanjuuni(*args, **kwargs):
return {'error': 'Sanjuuni模块未正确导入'}
def start_cleanup_thread():
pass
# 输入参数列表
enter_parameter_table = {
@@ -40,6 +51,175 @@ def validate_request(data, api_name):
return jsonify({'error': f"{key}参数类型错误,您输入为{type(data[key])},应为{enter_parameter_table[api_name][key]}" }), 400
return None, None
def run_async_task(task_id, process_func, data):
"""运行异步任务"""
try:
with app.app_context():
# 更新任务状态为运行中
with task_lock:
task_registry[task_id]['status'] = 'running'
task_registry[task_id]['start_time'] = time.time()
# 执行处理函数
result = process_func(data, file_registry, file_lock, task_id, task_registry, task_lock)
# 更新任务状态
with task_lock:
if 'error' in result:
task_registry[task_id]['status'] = 'error'
task_registry[task_id]['error'] = result['error']
else:
task_registry[task_id]['status'] = 'completed'
task_registry[task_id]['result'] = result
task_registry[task_id]['end_time'] = time.time()
except Exception as e:
logging.error(f"任务 {task_id} 执行异常: {e}")
with task_lock:
task_registry[task_id]['status'] = 'error'
task_registry[task_id]['error'] = str(e)
task_registry[task_id]['end_time'] = time.time()
@app.route('/api/ffmpeg/async', methods=['POST'])
def ffmpeg_async_api():
"""创建异步FFmpeg任务"""
logging.info("收到异步FFmpeg API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "ffmpeg")
if error_response:
return error_response, status_code
# 生成任务ID
task_id = str(uuid.uuid4())[:8]
# 获取请求的主机信息用于构建URL
host = request.host
scheme = request.scheme
# 初始化任务状态
with task_lock:
task_registry[task_id] = {
'status': 'pending',
'type': 'ffmpeg',
'create_time': time.time(),
'logs': [],
'last_returned_index': 0, # 记录最后返回的日志索引
'progress': 0
}
# 启动异步任务
thread = threading.Thread(
target=run_async_task,
args=(task_id, process_ffmpeg, data)
)
thread.daemon = True
thread.start()
status_url = f"{scheme}://{host}/api/task/{task_id}"
logging.info(f"创建异步FFmpeg任务: {task_id}")
return jsonify({
'status': 'success',
'task_id': task_id,
'status_url': status_url,
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/sanjuuni/async', methods=['POST'])
def sanjuuni_async_api():
"""创建异步Sanjuuni任务"""
logging.info("收到异步Sanjuuni API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "sanjuuni")
if error_response:
return error_response, status_code
# 生成任务ID
task_id = str(uuid.uuid4())[:8]
# 获取请求的主机信息用于构建URL
host = request.host
scheme = request.scheme
# 初始化任务状态
with task_lock:
task_registry[task_id] = {
'status': 'pending',
'type': 'sanjuuni',
'create_time': time.time(),
'logs': [],
'last_returned_index': 0, # 记录最后返回的日志索引
'progress': 0
}
# 启动异步任务
thread = threading.Thread(
target=run_async_task,
args=(task_id, process_sanjuuni, data)
)
thread.daemon = True
thread.start()
status_url = f"{scheme}://{host}/api/task/{task_id}"
logging.info(f"创建异步Sanjuuni任务: {task_id}")
return jsonify({
'status': 'success',
'task_id': task_id,
'status_url': status_url,
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""查询任务状态和进度(自动返回新增日志)"""
with task_lock:
if task_id not in task_registry:
return jsonify({'error': '任务不存在'}), 404
task_info = task_registry[task_id].copy()
current_log_count = len(task_info['logs'])
last_returned_index = task_info['last_returned_index']
# 计算新增日志
if last_returned_index < current_log_count:
new_logs = task_info['logs'][last_returned_index:]
# 更新最后返回的日志索引
task_registry[task_id]['last_returned_index'] = current_log_count
else:
new_logs = []
# 构建响应
response = {
'task_id': task_id,
'status': task_info['status'],
'type': task_info['type'],
'create_time': task_info['create_time'],
'progress': task_info.get('progress', 0),
'total_logs': current_log_count,
'new_logs': new_logs,
'last_index': current_log_count # 返回当前日志总数,方便客户端跟踪
}
if 'start_time' in task_info:
response['start_time'] = task_info['start_time']
if 'end_time' in task_info:
response['end_time'] = task_info['end_time']
# 根据状态返回不同信息
if task_info['status'] == 'completed':
response['result'] = task_info['result']
elif task_info['status'] == 'error':
response['error'] = task_info['error']
return jsonify(response), 200
# 原有的同步接口保持不变
@app.route('/api/ffmpeg', methods=['POST'])
def ffmpeg_api():
logging.info("收到FFmpeg API请求")
@@ -54,7 +234,7 @@ def ffmpeg_api():
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context(): # 设置应用上下文
with app.app_context():
result = process_ffmpeg(data, file_registry, file_lock)
result_queue.put(result)
@@ -88,7 +268,7 @@ def sanjuuni_api():
# 创建处理进程
result_queue = queue.Queue()
def run_process(data, file_registry, file_lock, result_queue):
with app.app_context(): # 设置应用上下文
with app.app_context():
result = process_sanjuuni(data, file_registry, file_lock)
result_queue.put(result)
@@ -127,15 +307,24 @@ def download_file_endpoint(file_id, filename):
logging.error(f"下载文件时出错: {e}")
return jsonify({'status': 'error', 'error': str(e)}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查接口,直接返回 'ok'"""
return "OK"
@app.route('/api/tasks', methods=['GET'])
def list_tasks():
"""列出所有任务(用于调试)"""
with task_lock:
tasks = {}
for task_id, task_info in task_registry.items():
tasks[task_id] = {
'status': task_info['status'],
'type': task_info['type'],
'create_time': task_info['create_time'],
'progress': task_info.get('progress', 0)
}
return jsonify(tasks), 200
if __name__ == '__main__':
# 配置命令行参数解析

View File

@@ -6,7 +6,8 @@ import tempfile
import shutil
import logging
from contextlib import contextmanager
import time # 导入 time 模块
import time
from shared_utils import add_task_log
@contextmanager
def temp_directory(dir=None):
@@ -16,13 +17,16 @@ def temp_directory(dir=None):
finally:
pass
def download_file(url, temp_dir):
def download_file(url, temp_dir, task_id=None, task_registry=None, task_lock=None):
try:
headers = {
'User-Agent': 'Mozilla/5.0',
'Accept': '*/*'
}
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始从URL下载文件: {url}", task_registry, task_lock)
logging.info(f"开始从URL下载文件: {url}")
response = requests.get(url, headers=headers, stream=True, timeout=30)
response.raise_for_status()
@@ -40,44 +44,79 @@ def download_file(url, temp_dir):
logging.error(error_msg)
raise Exception(error_msg)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)", task_registry, task_lock)
logging.info(f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)")
return file_path
except Exception as e:
logging.error(f"{url} 下载文件时出错: {e}")
error_msg = f"{url} 下载文件时出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise
def execute_sanjuuni(input_path, output_path, sanjuuni_args):
def execute_sanjuuni(input_path, output_path, sanjuuni_args, task_id=None, task_registry=None, task_lock=None):
try:
cmd = ['lib/sanjuuni/sanjuuni', '-i', input_path] + sanjuuni_args + ['-o', output_path]
if task_id and task_registry and task_lock:
add_task_log(task_id, f"执行Sanjuuni命令: {' '.join(cmd)}", task_registry, task_lock)
logging.info(f"执行Sanjuuni命令: {' '.join(cmd)}")
result = subprocess.run(
# 使用Popen来实时捕获输出
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
encoding='utf-8',
errors='replace'
errors='replace',
bufsize=1
)
if result.returncode != 0:
error_msg = f"Sanjuuni处理失败返回码: {result.returncode}, 错误: {result.stderr}"
# 实时读取输出
while True:
output = process.stdout.readline()
if output == '' and process.poll() is not None:
break
if output:
output = output.strip()
if output and task_id and task_registry and task_lock:
add_task_log(task_id, output, task_registry, task_lock)
returncode = process.poll()
if returncode != 0:
error_msg = f"Sanjuuni处理失败返回码: {returncode}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise Exception(error_msg)
if task_id and task_registry and task_lock:
add_task_log(task_id, "Sanjuuni处理成功完成", task_registry, task_lock)
logging.info("Sanjuuni处理成功完成")
return True
except Exception as e:
logging.error(f"执行Sanjuuni时出错: {e}")
error_msg = f"执行Sanjuuni时出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
raise
def process_sanjuuni(data, file_registry, file_lock):
def process_sanjuuni(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None):
try:
temp_dir = tempfile.mkdtemp(dir='temp_files')
if task_id and task_registry and task_lock:
add_task_log(task_id, f"创建临时目录: {temp_dir}", task_registry, task_lock)
logging.info(f"创建临时目录: {temp_dir}")
input_url = data.get('input_url')
sanjuuni_args = data.get('args', []) # 从请求数据中获取 args 参数
sanjuuni_args = data.get('args', [])
# 定义不允许的参数
disallowed_params = [
@@ -114,33 +153,46 @@ def process_sanjuuni(data, file_registry, file_lock):
else:
raise ValueError(f"Unsupported output format: {output_format}")
input_path = download_file(input_url, temp_dir)
input_path = download_file(input_url, temp_dir, task_id, task_registry, task_lock)
output_id = str(uuid.uuid4())[:8]
output_filename = f"{output_id}.{output_format}"
output_path = os.path.join(temp_dir, output_filename)
execute_sanjuuni(input_path, output_path, sanjuuni_args)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始Sanjuuni处理输出格式: {output_format}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 50
execute_sanjuuni(input_path, output_path, sanjuuni_args, task_id, task_registry, task_lock)
with file_lock:
file_registry[output_id] = {
'path': os.path.abspath(output_path),
'filename': output_filename,
'last_access': time.time(), # 使用 time.time() 记录最后访问时间
'last_access': time.time(),
'download_count': 0
}
logging.info(f"已注册新文件ID: {output_id}, 路径: {output_path}")
if task_id and task_registry and task_lock:
add_task_log(task_id, f"已注册新文件ID: {output_id}, 路径: {output_path}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 100
logging.info(f"已注册新文件ID: {output_id}, 路径: {output_path}")
return {
'status': 'success',
'download_url': f"http://ffmpeg.liulikeji.cn/download/{output_id}/{output_filename}",
'file_id': output_id,
'temp_dir': temp_dir # 返回临时目录路径
'temp_dir': temp_dir
}
except Exception as e:
logging.error(f"处理过程中出错: {e}")
return {'error': str(e)}
error_msg = f"处理过程中出错: {e}"
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
logging.error(error_msg)
return {'error': str(e)}

22
shared_utils.py Normal file
View File

@@ -0,0 +1,22 @@
# shared_utils.py
import time
import logging
from threading import Lock
# 全局变量
task_registry = {}
file_registry = {}
file_lock = Lock()
task_lock = Lock()
def add_task_log(task_id, message, task_registry, task_lock):
"""添加任务日志"""
with task_lock:
if task_id in task_registry:
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
log_entry = f"[{timestamp}] {message}"
task_registry[task_id]['logs'].append(log_entry)
# 保持日志数量不超过1000条
if len(task_registry[task_id]['logs']) > 1000:
task_registry[task_id]['logs'] = task_registry[task_id]['logs'][-1000:]