import os import sys import uuid import subprocess import tempfile import threading import time from datetime import datetime, timedelta import json import glob import requests import logging from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log # ====================== # 配置 # ====================== BASE_DIR = os.path.dirname(os.path.abspath(__file__)) FRAMES_ROOT = os.path.abspath("./frames") TASK_TIMEOUT_HOURS = 1 # 任务过期时间(小时) # 确保frames目录存在 os.makedirs(FRAMES_ROOT, exist_ok=True) if sys.platform.startswith('win'): FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg.exe') FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe.exe') else: FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg') FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe') def parse_ffmpeg_frame_progress(line, task_id, task_registry, task_lock): """解析FFmpeg进度输出并更新任务状态""" import re # 匹配帧数:frame= 180 fps= 90 frame_match = re.search(r'frame=\s*(\d+)', line) if frame_match: current_frame = int(frame_match.group(1)) # 更新任务状态的当前帧数 with task_lock: if task_id in task_registry: task_registry[task_id]['current_frames'] = current_frame # 如果有总帧数估计,可以计算进度 if 'estimated_total_frames' in task_registry[task_id]: total_frames = task_registry[task_id]['estimated_total_frames'] if total_frames > 0: progress = min(80, int((current_frame / total_frames) * 60) + 20) # 20-80%为转换进度 task_registry[task_id]['progress'] = progress def log_subprocess_output(pipe, task_id, task_registry, task_lock, prefix=""): """从管道实时读取并记录日志""" if not pipe: return try: for line in iter(pipe.readline, ''): if line: clean_line = line.strip() if clean_line and task_id and task_registry and task_lock: # 过滤掉一些不必要的进度字符 if '\r' in clean_line: clean_line = clean_line.replace('\r', '') # 过滤掉过于频繁的进度更新(只记录有意义的内容) if prefix.startswith("[yt-dlp") and clean_line.startswith('['): # 只记录重要的进度信息,过滤掉过于频繁的百分比更新 if '%' not in clean_line or '100%' in clean_line or 'Downloading' in clean_line: add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock) elif prefix == "[FFmpeg] " and "frame=" in clean_line: # FFmpeg进度信息 add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock) parse_ffmpeg_frame_progress(clean_line, task_id, task_registry, task_lock) else: # 其他日志正常记录 add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock) except Exception as e: if task_id and task_registry and task_lock: add_task_log(task_id, f"[日志读取错误] {e}", task_registry, task_lock) finally: try: pipe.close() except: pass # ====================== # 任务处理函数 # ====================== def process_video_frame_extraction(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None): """处理视频帧提取任务""" try: if task_id and task_registry and task_lock: add_task_log(task_id, "开始处理视频帧提取任务", task_registry, task_lock) logging.info(f"开始处理视频帧提取任务: {task_id}") # 提取参数 video_url = data.get('video_url') video_bv = data.get('video_bv') # 支持BV号 w = data.get('w') h = data.get('h') fps = data.get('fps', 30) force_resolution = data.get('force_resolution', False) pad_to_target = data.get('pad_to_target', False) # 验证参数 if not video_url and not video_bv: raise Exception("缺少必要参数: video_url 或 video_bv") if w is None or h is None: raise Exception("缺少必要参数: w, h") try: w = int(w) h = int(h) fps = float(fps) if w <= 0 or h <= 0 or fps <= 0: raise ValueError("参数必须为正数") except (ValueError, TypeError) as e: raise Exception(f"参数无效: {str(e)}") # 如果提供了BV号,自动生成Bilibili URL if not video_url and video_bv: video_bv = str(video_bv).strip() if not video_bv.startswith(('BV', 'bv')): raise Exception("video_bv 必须以 BV 或 bv 开头") video_url = f"https://www.bilibili.com/video/{video_bv.upper()}" # 准备目录 job_dir = os.path.join(FRAMES_ROOT, task_id) os.makedirs(job_dir, exist_ok=True) if task_id and task_registry and task_lock: add_task_log(task_id, f"创建任务目录: {job_dir}", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 10 # 下载视频 temp_base = os.path.join(tempfile.gettempdir(), task_id) # === 替换 yt-dlp 下载部分 === if task_id and task_registry and task_lock: add_task_log(task_id, f"开始下载视频: {video_url}", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 20 # 开始下载,进度20% yt_dlp_cmd = [ sys.executable, '-m', 'yt_dlp', video_url, '-o', temp_base, '-f', 'bv*[height<=720]+ba/b', '--no-warnings', '--progress', # 启用进度显示 '--newline', # 确保换行符正常 '--console-title', # 确保进度信息正确输出 '--no-colors', # 禁用颜色,避免控制字符干扰 ] # 使用 Popen 实时捕获 stdout 和 stderr proc = subprocess.Popen( yt_dlp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True, encoding='utf-8', errors='replace' ) # 启动日志线程(yt-dlp 进度主要在 stdout) stdout_thread = threading.Thread( target=log_subprocess_output, args=(proc.stdout, task_id, task_registry, task_lock, "[yt-dlp] "), daemon=True ) stdout_thread.start() # stderr 也可能有重要信息 stderr_thread = threading.Thread( target=log_subprocess_output, args=(proc.stderr, task_id, task_registry, task_lock, "[yt-dlp-error] "), daemon=True ) stderr_thread.start() try: proc.wait(timeout=600) if proc.returncode != 0: # 获取更详细的错误信息 error_details = [] try: if proc.stderr: stderr_content = proc.stderr.read() if stderr_content: error_details.append(f"stderr: {stderr_content}") if proc.stdout: stdout_content = proc.stdout.read() if stdout_content: error_details.append(f"stdout: {stdout_content}") except: pass error_msg = f"yt-dlp 下载失败 (返回码: {proc.returncode})" if error_details: error_msg += f" - {' '.join(error_details[:500])}" # 限制错误信息长度 raise subprocess.CalledProcessError(proc.returncode, yt_dlp_cmd, output=error_msg) except subprocess.TimeoutExpired: proc.kill() if task_id and task_registry and task_lock: add_task_log(task_id, "yt-dlp 下载超时(超过10分钟)", task_registry, task_lock) raise Exception("yt-dlp 下载超时(超过10分钟)") finally: # 等待日志线程结束 stdout_thread.join(timeout=5) stderr_thread.join(timeout=5) # 查找实际生成的文件 candidates = glob.glob(temp_base + ".*") if not candidates: raise Exception("yt-dlp 执行成功但未生成任何视频文件") # 按修改时间取最新 temp_video = max(candidates, key=os.path.getmtime) if task_id and task_registry and task_lock: add_task_log(task_id, "下载完成,开始获取视频信息", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 40 # 首先处理音频(在视频处理之前) audio_exists = has_audio(temp_video) audio_urls = {} if audio_exists: if task_id and task_registry and task_lock: add_task_log(task_id, "提取音频(左/右/混合声道)", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 40 # 公共参数 dfpwm_args = [ FFMPEG_PATH, '-y', '-i',temp_video, '-vn', '-ar', '48000', '-ac', '1', '-f', 'dfpwm' ] # 混合声道 dfpwm_path_mix = os.path.join(job_dir, "audio.dfpwm") subprocess.run( dfpwm_args + [dfpwm_path_mix], check=True, capture_output=True ) # 左声道 dfpwm_path_left = os.path.join(job_dir, "audio_left.dfpwm") subprocess.run( dfpwm_args + ['-af', 'pan=mono|c0=c0'] + [dfpwm_path_left], check=True, capture_output=True ) # 右声道 dfpwm_path_right = os.path.join(job_dir, "audio_right.dfpwm") subprocess.run( dfpwm_args + ['-af', 'pan=mono|c0=c1'] + [dfpwm_path_right], check=True, capture_output=True ) audio_urls = { "audio_dfpwm_url": f"/frames/{task_id}/audio.dfpwm", "audio_dfpwm_left_url": f"/frames/{task_id}/audio_left.dfpwm", "audio_dfpwm_right_url": f"/frames/{task_id}/audio_right.dfpwm" } # 更新音频URL到任务状态 if task_id and task_registry and task_lock: with task_lock: if task_id in task_registry: task_registry[task_id]['audio_urls'] = audio_urls # 获取视频信息 duration = get_video_duration(temp_video) # 估计总帧数 estimated_total_frames = int(duration * fps) # 初始化任务状态的帧相关字段 if task_id and task_registry and task_lock: with task_lock: if task_id in task_registry: task_registry[task_id]['estimated_total_frames'] = estimated_total_frames task_registry[task_id]['current_frames'] = 0 task_registry[task_id]['generated_frame_urls'] = [] task_registry[task_id]['frame_job_dir'] = job_dir task_registry[task_id]['frame_params'] = { 'w': w, 'h': h, 'fps': fps, 'force_resolution': force_resolution, 'pad_to_target': pad_to_target } # 构建滤镜 vf = build_video_filter(w, h, fps, force_resolution, pad_to_target) # 提取帧 if task_id and task_registry and task_lock: add_task_log(task_id, "开始提取视频帧", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 50 frame_pattern = os.path.join(job_dir, "frame_%06d.png") ffmpeg_cmd = [ FFMPEG_PATH, '-y', '-i', temp_video, '-vf', vf, '-pix_fmt', 'rgb24', '-stats', # 关键:启用进度统计 '-v', 'info', # 确保输出级别足够 frame_pattern ] proc = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # FFmpeg 的进度在 stderr stderr_thread = threading.Thread( target=log_subprocess_output, args=(proc.stderr, task_id, task_registry, task_lock, "[FFmpeg] "), daemon=True ) stderr_thread.start() try: proc.wait() if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, ffmpeg_cmd) finally: stderr_thread.join(timeout=5) # 获取输出分辨率 if task_id and task_registry and task_lock: add_task_log(task_id, "分析输出结果", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 80 out_w, out_h = get_output_resolution(job_dir) total_frames = len([f for f in os.listdir(job_dir) if f.endswith('.png')]) # 保存输出分辨率和实际总帧数到任务状态 if task_id and task_registry and task_lock: with task_lock: if task_id in task_registry: task_registry[task_id]['output_resolution'] = {"w": out_w, "h": out_h} task_registry[task_id]['estimated_total_frames'] = total_frames # 生成结果 result_data = { "duration_seconds": round(duration, 3), "total_frames": total_frames, "fps": fps, "output_resolution": {"w": out_w, "h": out_h}, "frame_urls": [f"/frames/{task_id}/frame_{i:06d}.png" for i in range(1, total_frames + 1)], "audio_dfpwm_url": f"/frames/{task_id}/audio.dfpwm" if has_audio else None, "audio_dfpwm_left_url": f"/frames/{task_id}/audio_left.dfpwm" if has_audio else None, "audio_dfpwm_right_url": f"/frames/{task_id}/audio_right.dfpwm" if has_audio else None } # 注册文件到文件注册表 with file_lock: for frame_file in glob.glob(os.path.join(job_dir, "frame_*.png")): filename = os.path.basename(frame_file) file_id = f"frame_{task_id}_{filename}" file_registry[file_id] = { 'path': os.path.abspath(frame_file), 'filename': filename, 'last_access': time.time(), 'download_count': 0 } if audio_exists: for audio_file in ["audio.dfpwm", "audio_left.dfpwm", "audio_right.dfpwm"]: audio_path = os.path.join(job_dir, audio_file) if os.path.exists(audio_path): file_id = f"audio_{task_id}_{audio_file}" file_registry[file_id] = { 'path': os.path.abspath(audio_path), 'filename': audio_file, 'last_access': time.time(), 'download_count': 0 } if task_id and task_registry and task_lock: add_task_log(task_id, "视频帧提取任务完成", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 100 return { 'status': 'success', 'result': result_data, 'task_id': task_id, 'temp_dir': job_dir } except subprocess.CalledProcessError as e: stderr_str = e.stderr.strip() if e.stderr else "" stdout_str = e.stdout.strip() if e.stdout else "" error_output = stderr_str or stdout_str or str(e) error_msg = error_output[-500:] if len(error_output) > 500 else error_output if task_id and task_registry and task_lock: add_task_log(task_id, f"FFmpeg处理失败: {error_msg}", task_registry, task_lock) return {'error': f'FFmpeg处理失败: {error_msg}'} except requests.RequestException as e: error_msg = f'视频下载失败: {str(e)}' if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) return {'error': error_msg} except Exception as e: error_msg = f'处理失败: {str(e)}' if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) return {'error': error_msg} finally: # 清理临时视频文件 for f in glob.glob(os.path.join(tempfile.gettempdir(), f"{task_id}.*")): try: os.remove(f) except: pass # ====================== # 辅助函数 # ====================== def get_video_duration(video_path): """获取视频时长""" result = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', video_path ], capture_output=True, text=True, check=True) duration_str = result.stdout.strip() return float(duration_str) if duration_str and duration_str != 'N/A' else 0.0 def build_video_filter(w, h, fps, force_resolution, pad_to_target): """构建视频滤镜""" if force_resolution: return f"scale={w}:{h},fps={fps}" else: scale_expr = f"scale='min({w},iw*min(1,{h}/ih))':'min({h},ih*min(1,{w}/iw))'" if pad_to_target: return f"{scale_expr},pad={w}:{h}:(ow-iw)/2:(oh-ih)/2,fps={fps}" else: return f"{scale_expr},fps={fps}" def get_output_resolution(job_dir): """获取输出分辨率""" first_frame = None for f in sorted(os.listdir(job_dir)): if f.endswith('.png'): first_frame = os.path.join(job_dir, f) break if first_frame: probe_res = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=p=0', first_frame ], capture_output=True, text=True, check=True) out_w, out_h = map(int, probe_res.stdout.strip().split(',')) return out_w, out_h return 0, 0 def has_audio(video_path): """检查视频是否有音频""" probe_streams = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-show_entries', 'stream=codec_type', '-of', 'csv=p=0', video_path ], capture_output=True, text=True) return 'audio' in probe_streams.stdout.lower()