import os import uuid import subprocess import requests import tempfile import shutil import logging import threading import time from contextlib import contextmanager from shared_utils import add_task_log UPLOAD_FOLDER = 'temp_files' @contextmanager def temp_directory(dir=None): temp_dir = tempfile.mkdtemp(dir=dir) try: yield temp_dir finally: pass def download_file(url, temp_dir, task_id=None, task_registry=None, task_lock=None): try: headers = { 'User-Agent': 'Mozilla/5.0', 'Accept': '*/*' } if task_id and task_registry and task_lock: add_task_log(task_id, f"开始从URL下载文件: {url}", task_registry, task_lock) logging.info(f"开始从URL下载文件: {url}") response = requests.get(url, headers=headers, stream=True, timeout=30) response.raise_for_status() file_path = os.path.join(temp_dir, 'input_audio') total_size = int(response.headers.get('content-length', 0)) downloaded_size = 0 with open(file_path, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): if chunk: f.write(chunk) downloaded_size += len(chunk) # 更新下载进度 if total_size > 0 and task_id and task_registry and task_lock: progress = min(50, int((downloaded_size / total_size) * 50)) # 下载占50%进度 with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = progress file_size = os.path.getsize(file_path) if file_size < 1024: error_msg = f"下载的文件太小(仅{file_size}字节),可能无效" logging.error(error_msg) raise Exception(error_msg) if task_id and task_registry and task_lock: add_task_log(task_id, f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)", task_registry, task_lock) logging.info(f"文件下载成功,保存到: {file_path} (大小: {file_size}字节)") return file_path except Exception as e: error_msg = f"从 {url} 下载文件时出错: {e}" if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) logging.error(error_msg) raise def execute_ffmpeg(input_path, output_path, ffmpeg_args, task_id=None, task_registry=None, task_lock=None): try: filtered_args = [arg for arg in ffmpeg_args if not (arg.lower() in ['-i', '-input'] or (ffmpeg_args.index(arg) > 0 and ffmpeg_args[ffmpeg_args.index(arg) - 1].lower() == '-i'))] cmd = ['lib/ffmpeg/bin/ffmpeg', '-i', input_path] + filtered_args + [output_path] if task_id and task_registry and task_lock: add_task_log(task_id, f"执行FFmpeg命令: {' '.join(cmd)}", task_registry, task_lock) logging.info(f"执行FFmpeg命令: {' '.join(cmd)}") # 使用Popen来实时捕获输出 process = subprocess.Popen( cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # 将stderr重定向到stdout universal_newlines=True, encoding='utf-8', errors='replace', bufsize=1 ) # 实时读取输出 while True: output = process.stdout.readline() if output == '' and process.poll() is not None: break if output: output = output.strip() if output and task_id and task_registry and task_lock: add_task_log(task_id, output, task_registry, task_lock) # 简单的进度解析(可以根据FFmpeg的输出格式进行优化) if 'time=' in output and 'bitrate=' in output: # 这里可以添加更复杂的进度解析逻辑 if task_id and task_registry and task_lock: with task_lock: if task_id in task_registry: # 处理进度从50%开始到100% task_registry[task_id]['progress'] = 50 + 50 // 2 # 简单示例 returncode = process.poll() if returncode != 0: error_msg = f"FFmpeg处理失败,返回码: {returncode}" if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) logging.error(error_msg) raise Exception(error_msg) if task_id and task_registry and task_lock: add_task_log(task_id, "FFmpeg处理成功完成", task_registry, task_lock) logging.info("FFmpeg处理成功完成") return True except Exception as e: error_msg = f"执行FFmpeg时出错: {e}" if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) logging.error(error_msg) raise def process_ffmpeg(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None): try: temp_dir = tempfile.mkdtemp(dir=UPLOAD_FOLDER) if task_id and task_registry and task_lock: add_task_log(task_id, f"创建临时目录: {temp_dir}", task_registry, task_lock) logging.info(f"创建临时目录: {temp_dir}") input_url = data.get('input_url') ffmpeg_args = data.get('args', []) input_path = download_file(input_url, temp_dir, task_id, task_registry, task_lock) output_id = str(uuid.uuid4())[:8] output_format = data.get('output_format', 'mp4') output_filename = f"{output_id}.{output_format}" output_path = os.path.join(temp_dir, output_filename) if task_id and task_registry and task_lock: add_task_log(task_id, f"开始FFmpeg处理,输出格式: {output_format}", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 50 # 开始处理,进度50% execute_ffmpeg(input_path, output_path, ffmpeg_args, task_id, task_registry, task_lock) with file_lock: file_registry[output_id] = { 'path': os.path.abspath(output_path), 'filename': output_filename, 'last_access': time.time(), 'download_count': 0 } if task_id and task_registry and task_lock: add_task_log(task_id, f"已注册新文件ID: {output_id}, 路径: {output_path}", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 100 # 处理完成,进度100% logging.info(f"已注册新文件ID: {output_id}, 路径: {output_path}") # 返回临时目录路径以便在主函数中删除 return { 'status': 'success', 'download_url': f"http://newgmapi.liulikeji.cn/download/{output_id}/{output_filename}", 'file_id': output_id, 'temp_dir': temp_dir } except Exception as e: error_msg = f"处理过程中出错: {e}" if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) logging.error(error_msg) return {'error': str(e)}