import os import sys import uuid import subprocess import tempfile import threading import time from datetime import datetime, timedelta import json import glob import requests import logging from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log # ====================== # 配置 # ====================== BASE_DIR = os.path.dirname(os.path.abspath(__file__)) FRAMES_ROOT = os.path.abspath("./frames") TASK_TIMEOUT_HOURS = 1 # 任务过期时间(小时) # 确保frames目录存在 os.makedirs(FRAMES_ROOT, exist_ok=True) if sys.platform.startswith('win'): FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg.exe') FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe.exe') else: FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg') FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe') def log_subprocess_output(pipe, task_id, task_registry, task_lock, prefix=""): """从管道实时读取并记录日志""" if not pipe: return try: for line in iter(pipe.readline, ''): if line: clean_line = line.strip() if clean_line and task_id and task_registry and task_lock: add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock) except Exception as e: if task_id and task_registry and task_lock: add_task_log(task_id, f"[日志读取错误] {e}", task_registry, task_lock) finally: pipe.close() # ====================== # 任务处理函数 # ====================== def process_video_frame_extraction(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None): """处理视频帧提取任务""" try: if task_id and task_registry and task_lock: add_task_log(task_id, "开始处理视频帧提取任务", task_registry, task_lock) logging.info(f"开始处理视频帧提取任务: {task_id}") # 提取参数 video_url = data.get('video_url') video_bv = data.get('video_bv') # 支持BV号 w = data.get('w') h = data.get('h') fps = data.get('fps', 30) force_resolution = data.get('force_resolution', False) pad_to_target = data.get('pad_to_target', False) # 验证参数 if not video_url and not video_bv: raise Exception("缺少必要参数: video_url 或 video_bv") if w is None or h is None: raise Exception("缺少必要参数: w, h") try: w = int(w) h = int(h) fps = float(fps) if w <= 0 or h <= 0 or fps <= 0: raise ValueError("参数必须为正数") except (ValueError, TypeError) as e: raise Exception(f"参数无效: {str(e)}") # 如果提供了BV号,自动生成Bilibili URL if not video_url and video_bv: video_bv = str(video_bv).strip() if not video_bv.startswith(('BV', 'bv')): raise Exception("video_bv 必须以 BV 或 bv 开头") video_url = f"https://www.bilibili.com/video/{video_bv.upper()}" # 准备目录 job_dir = os.path.join(FRAMES_ROOT, task_id) os.makedirs(job_dir, exist_ok=True) if task_id and task_registry and task_lock: add_task_log(task_id, f"创建任务目录: {job_dir}", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 10 # 下载视频 temp_base = os.path.join(tempfile.gettempdir(), task_id) # === 替换 yt-dlp 下载部分 === if task_id and task_registry and task_lock: add_task_log(task_id, f"开始下载视频: {video_url}", task_registry, task_lock) yt_dlp_cmd = [ sys.executable, '-m', 'yt_dlp', video_url, '-o', temp_base, ] # 使用 Popen 实时捕获 stderr proc = subprocess.Popen( yt_dlp_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # 启动日志线程(只读 stderr,因为 yt-dlp 进度在 stderr) stderr_thread = threading.Thread( target=log_subprocess_output, args=(proc.stderr, task_id, task_registry, task_lock, "[yt-dlp] "), daemon=True ) stderr_thread.start() try: proc.wait(timeout=600) if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, yt_dlp_cmd) except subprocess.TimeoutExpired: proc.kill() raise Exception("yt-dlp 下载超时(超过10分钟)") finally: stderr_thread.join(timeout=5) # 等待日志线程结束 # 查找实际生成的文件 candidates = glob.glob(temp_base + ".*") if not candidates: raise Exception("yt-dlp 执行成功但未生成任何视频文件") # 按修改时间取最新 temp_video = max(candidates, key=os.path.getmtime) if task_id and task_registry and task_lock: add_task_log(task_id, "下载完成,开始获取视频信息", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 40 # 获取视频信息 duration = get_video_duration(temp_video) # 构建滤镜 vf = build_video_filter(w, h, fps, force_resolution, pad_to_target) # 提取帧 if task_id and task_registry and task_lock: add_task_log(task_id, "开始提取视频帧", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 50 frame_pattern = os.path.join(job_dir, "frame_%06d.png") ffmpeg_cmd = [ FFMPEG_PATH, '-y', '-i', temp_video, '-vf', vf, '-pix_fmt', 'rgb24', '-stats', # 关键:启用进度统计 '-v', 'info', # 确保输出级别足够 frame_pattern ] proc = subprocess.Popen( ffmpeg_cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, universal_newlines=True ) # FFmpeg 的进度在 stderr stderr_thread = threading.Thread( target=log_subprocess_output, args=(proc.stderr, task_id, task_registry, task_lock, "[FFmpeg] "), daemon=True ) stderr_thread.start() try: proc.wait() if proc.returncode != 0: raise subprocess.CalledProcessError(proc.returncode, ffmpeg_cmd) finally: stderr_thread.join(timeout=5) # 获取输出分辨率 if task_id and task_registry and task_lock: add_task_log(task_id, "分析输出结果", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 80 out_w, out_h = get_output_resolution(job_dir) total_frames = len([f for f in os.listdir(job_dir) if f.endswith('.png')]) # 检查音频 audio_exists = has_audio(temp_video) if audio_exists: if task_id and task_registry and task_lock: add_task_log(task_id, "提取音频(左/右/混合声道)", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 90 # 公共参数 dfpwm_args = [ FFMPEG_PATH, '-y', '-i', temp_video, '-vn', '-ar', '48000', '-ac', '1', '-f', 'dfpwm' ] # 混合声道 dfpwm_path_mix = os.path.join(job_dir, "audio.dfpwm") subprocess.run( dfpwm_args + [dfpwm_path_mix], check=True, capture_output=True ) # 左声道 dfpwm_path_left = os.path.join(job_dir, "audio_left.dfpwm") subprocess.run( dfpwm_args + ['-af', 'pan=mono|c0=c0'] + [dfpwm_path_left], check=True, capture_output=True ) # 右声道 dfpwm_path_right = os.path.join(job_dir, "audio_right.dfpwm") subprocess.run( dfpwm_args + ['-af', 'pan=mono|c0=c1'] + [dfpwm_path_right], check=True, capture_output=True ) # 生成结果 result_data = { "duration_seconds": round(duration, 3), "total_frames": total_frames, "fps": fps, "output_resolution": {"w": out_w, "h": out_h}, "frame_urls": [f"/frames/{task_id}/frame_{i:06d}.png" for i in range(1, total_frames + 1)], "audio_dfpwm_url": f"/frames/{task_id}/audio.dfpwm" if has_audio else None, "audio_dfpwm_left_url": f"/frames/{task_id}/audio_left.dfpwm" if has_audio else None, "audio_dfpwm_right_url": f"/frames/{task_id}/audio_right.dfpwm" if has_audio else None } # 注册文件到文件注册表 with file_lock: for frame_file in glob.glob(os.path.join(job_dir, "frame_*.png")): filename = os.path.basename(frame_file) file_id = f"frame_{task_id}_{filename}" file_registry[file_id] = { 'path': os.path.abspath(frame_file), 'filename': filename, 'last_access': time.time(), 'download_count': 0 } if audio_exists: for audio_file in ["audio.dfpwm", "audio_left.dfpwm", "audio_right.dfpwm"]: audio_path = os.path.join(job_dir, audio_file) if os.path.exists(audio_path): file_id = f"audio_{task_id}_{audio_file}" file_registry[file_id] = { 'path': os.path.abspath(audio_path), 'filename': audio_file, 'last_access': time.time(), 'download_count': 0 } if task_id and task_registry and task_lock: add_task_log(task_id, "视频帧提取任务完成", task_registry, task_lock) with task_lock: if task_id in task_registry: task_registry[task_id]['progress'] = 100 return { 'status': 'success', 'result': result_data, 'task_id': task_id, 'temp_dir': job_dir } except subprocess.CalledProcessError as e: stderr_str = e.stderr.strip() if e.stderr else "" stdout_str = e.stdout.strip() if e.stdout else "" error_output = stderr_str or stdout_str or str(e) error_msg = error_output[-500:] if len(error_output) > 500 else error_output if task_id and task_registry and task_lock: add_task_log(task_id, f"FFmpeg处理失败: {error_msg}", task_registry, task_lock) return {'error': f'FFmpeg处理失败: {error_msg}'} except requests.RequestException as e: error_msg = f'视频下载失败: {str(e)}' if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) return {'error': error_msg} except Exception as e: error_msg = f'处理失败: {str(e)}' if task_id and task_registry and task_lock: add_task_log(task_id, error_msg, task_registry, task_lock) return {'error': error_msg} finally: # 清理临时视频文件 for f in glob.glob(os.path.join(tempfile.gettempdir(), f"{task_id}.*")): try: os.remove(f) except: pass # ====================== # 辅助函数 # ====================== def get_video_duration(video_path): """获取视频时长""" result = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-show_entries', 'format=duration', '-of', 'csv=p=0', video_path ], capture_output=True, text=True, check=True) duration_str = result.stdout.strip() return float(duration_str) if duration_str and duration_str != 'N/A' else 0.0 def build_video_filter(w, h, fps, force_resolution, pad_to_target): """构建视频滤镜""" if force_resolution: return f"scale={w}:{h},fps={fps}" else: scale_expr = f"scale='min({w},iw*min(1,{h}/ih))':'min({h},ih*min(1,{w}/iw))'" if pad_to_target: return f"{scale_expr},pad={w}:{h}:(ow-iw)/2:(oh-ih)/2,fps={fps}" else: return f"{scale_expr},fps={fps}" def get_output_resolution(job_dir): """获取输出分辨率""" first_frame = None for f in sorted(os.listdir(job_dir)): if f.endswith('.png'): first_frame = os.path.join(job_dir, f) break if first_frame: probe_res = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-select_streams', 'v:0', '-show_entries', 'stream=width,height', '-of', 'csv=p=0', first_frame ], capture_output=True, text=True, check=True) out_w, out_h = map(int, probe_res.stdout.strip().split(',')) return out_w, out_h return 0, 0 def has_audio(video_path): """检查视频是否有音频""" probe_streams = subprocess.run([ FFPROBE_PATH, '-v', 'error', '-show_entries', 'stream=codec_type', '-of', 'csv=p=0', video_path ], capture_output=True, text=True) return 'audio' in probe_streams.stdout.lower()