Files
GMapiServer/video_frame_utils.py
2026-01-10 02:07:34 +08:00

391 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import sys
import uuid
import subprocess
import tempfile
import threading
import time
from datetime import datetime, timedelta
import json
import glob
import requests
import logging
from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log
# ======================
# 配置
# ======================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FRAMES_ROOT = os.path.abspath("./frames")
TASK_TIMEOUT_HOURS = 1 # 任务过期时间(小时)
# 确保frames目录存在
os.makedirs(FRAMES_ROOT, exist_ok=True)
if sys.platform.startswith('win'):
FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg.exe')
FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe.exe')
else:
FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg')
FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe')
def log_subprocess_output(pipe, task_id, task_registry, task_lock, prefix=""):
"""从管道实时读取并记录日志"""
if not pipe:
return
try:
for line in iter(pipe.readline, ''):
if line:
clean_line = line.strip()
if clean_line and task_id and task_registry and task_lock:
add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock)
except Exception as e:
if task_id and task_registry and task_lock:
add_task_log(task_id, f"[日志读取错误] {e}", task_registry, task_lock)
finally:
pipe.close()
# ======================
# 任务处理函数
# ======================
def process_video_frame_extraction(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None):
"""处理视频帧提取任务"""
try:
if task_id and task_registry and task_lock:
add_task_log(task_id, "开始处理视频帧提取任务", task_registry, task_lock)
logging.info(f"开始处理视频帧提取任务: {task_id}")
# 提取参数
video_url = data.get('video_url')
video_bv = data.get('video_bv') # 支持BV号
w = data.get('w')
h = data.get('h')
fps = data.get('fps', 30)
force_resolution = data.get('force_resolution', False)
pad_to_target = data.get('pad_to_target', False)
# 验证参数
if not video_url and not video_bv:
raise Exception("缺少必要参数: video_url 或 video_bv")
if w is None or h is None:
raise Exception("缺少必要参数: w, h")
try:
w = int(w)
h = int(h)
fps = float(fps)
if w <= 0 or h <= 0 or fps <= 0:
raise ValueError("参数必须为正数")
except (ValueError, TypeError) as e:
raise Exception(f"参数无效: {str(e)}")
# 如果提供了BV号自动生成Bilibili URL
if not video_url and video_bv:
video_bv = str(video_bv).strip()
if not video_bv.startswith(('BV', 'bv')):
raise Exception("video_bv 必须以 BV 或 bv 开头")
video_url = f"https://www.bilibili.com/video/{video_bv.upper()}"
# 准备目录
job_dir = os.path.join(FRAMES_ROOT, task_id)
os.makedirs(job_dir, exist_ok=True)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"创建任务目录: {job_dir}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 10
# 下载视频
temp_base = os.path.join(tempfile.gettempdir(), task_id)
# === 替换 yt-dlp 下载部分 ===
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始下载视频: {video_url}", task_registry, task_lock)
yt_dlp_cmd = [
sys.executable, '-m', 'yt_dlp',
video_url,
'-o', temp_base,
]
# 使用 Popen 实时捕获 stderr
proc = subprocess.Popen(
yt_dlp_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# 启动日志线程(只读 stderr因为 yt-dlp 进度在 stderr
stderr_thread = threading.Thread(
target=log_subprocess_output,
args=(proc.stderr, task_id, task_registry, task_lock, "[yt-dlp] "),
daemon=True
)
stderr_thread.start()
try:
proc.wait(timeout=600)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, yt_dlp_cmd)
except subprocess.TimeoutExpired:
proc.kill()
raise Exception("yt-dlp 下载超时超过10分钟")
finally:
stderr_thread.join(timeout=5) # 等待日志线程结束
# 查找实际生成的文件
candidates = glob.glob(temp_base + ".*")
if not candidates:
raise Exception("yt-dlp 执行成功但未生成任何视频文件")
# 按修改时间取最新
temp_video = max(candidates, key=os.path.getmtime)
if task_id and task_registry and task_lock:
add_task_log(task_id, "下载完成,开始获取视频信息", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 40
# 获取视频信息
duration = get_video_duration(temp_video)
# 构建滤镜
vf = build_video_filter(w, h, fps, force_resolution, pad_to_target)
# 提取帧
if task_id and task_registry and task_lock:
add_task_log(task_id, "开始提取视频帧", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 50
frame_pattern = os.path.join(job_dir, "frame_%06d.png")
ffmpeg_cmd = [
FFMPEG_PATH, '-y',
'-i', temp_video,
'-vf', vf,
'-pix_fmt', 'rgb24',
'-stats', # 关键:启用进度统计
'-v', 'info', # 确保输出级别足够
frame_pattern
]
proc = subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# FFmpeg 的进度在 stderr
stderr_thread = threading.Thread(
target=log_subprocess_output,
args=(proc.stderr, task_id, task_registry, task_lock, "[FFmpeg] "),
daemon=True
)
stderr_thread.start()
try:
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, ffmpeg_cmd)
finally:
stderr_thread.join(timeout=5)
# 获取输出分辨率
if task_id and task_registry and task_lock:
add_task_log(task_id, "分析输出结果", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 80
out_w, out_h = get_output_resolution(job_dir)
total_frames = len([f for f in os.listdir(job_dir) if f.endswith('.png')])
# 检查音频
audio_exists = has_audio(temp_video)
if audio_exists:
if task_id and task_registry and task_lock:
add_task_log(task_id, "提取音频(左/右/混合声道)", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 90
# 公共参数
dfpwm_args = [
FFMPEG_PATH, '-y',
'-i', temp_video,
'-vn',
'-ar', '48000',
'-ac', '1',
'-f', 'dfpwm'
]
# 混合声道
dfpwm_path_mix = os.path.join(job_dir, "audio.dfpwm")
subprocess.run(
dfpwm_args + [dfpwm_path_mix],
check=True, capture_output=True
)
# 左声道
dfpwm_path_left = os.path.join(job_dir, "audio_left.dfpwm")
subprocess.run(
dfpwm_args + ['-af', 'pan=mono|c0=c0'] + [dfpwm_path_left],
check=True, capture_output=True
)
# 右声道
dfpwm_path_right = os.path.join(job_dir, "audio_right.dfpwm")
subprocess.run(
dfpwm_args + ['-af', 'pan=mono|c0=c1'] + [dfpwm_path_right],
check=True, capture_output=True
)
# 生成结果
result_data = {
"duration_seconds": round(duration, 3),
"total_frames": total_frames,
"fps": fps,
"output_resolution": {"w": out_w, "h": out_h},
"frame_urls": [f"/frames/{task_id}/frame_{i:06d}.png" for i in range(1, total_frames + 1)],
"audio_dfpwm_url": f"/frames/{task_id}/audio.dfpwm" if has_audio else None,
"audio_dfpwm_left_url": f"/frames/{task_id}/audio_left.dfpwm" if has_audio else None,
"audio_dfpwm_right_url": f"/frames/{task_id}/audio_right.dfpwm" if has_audio else None
}
# 注册文件到文件注册表
with file_lock:
for frame_file in glob.glob(os.path.join(job_dir, "frame_*.png")):
filename = os.path.basename(frame_file)
file_id = f"frame_{task_id}_{filename}"
file_registry[file_id] = {
'path': os.path.abspath(frame_file),
'filename': filename,
'last_access': time.time(),
'download_count': 0
}
if audio_exists:
for audio_file in ["audio.dfpwm", "audio_left.dfpwm", "audio_right.dfpwm"]:
audio_path = os.path.join(job_dir, audio_file)
if os.path.exists(audio_path):
file_id = f"audio_{task_id}_{audio_file}"
file_registry[file_id] = {
'path': os.path.abspath(audio_path),
'filename': audio_file,
'last_access': time.time(),
'download_count': 0
}
if task_id and task_registry and task_lock:
add_task_log(task_id, "视频帧提取任务完成", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 100
return {
'status': 'success',
'result': result_data,
'task_id': task_id,
'temp_dir': job_dir
}
except subprocess.CalledProcessError as e:
stderr_str = e.stderr.strip() if e.stderr else ""
stdout_str = e.stdout.strip() if e.stdout else ""
error_output = stderr_str or stdout_str or str(e)
error_msg = error_output[-500:] if len(error_output) > 500 else error_output
if task_id and task_registry and task_lock:
add_task_log(task_id, f"FFmpeg处理失败: {error_msg}", task_registry, task_lock)
return {'error': f'FFmpeg处理失败: {error_msg}'}
except requests.RequestException as e:
error_msg = f'视频下载失败: {str(e)}'
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
return {'error': error_msg}
except Exception as e:
error_msg = f'处理失败: {str(e)}'
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
return {'error': error_msg}
finally:
# 清理临时视频文件
for f in glob.glob(os.path.join(tempfile.gettempdir(), f"{task_id}.*")):
try:
os.remove(f)
except:
pass
# ======================
# 辅助函数
# ======================
def get_video_duration(video_path):
"""获取视频时长"""
result = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'csv=p=0',
video_path
], capture_output=True, text=True, check=True)
duration_str = result.stdout.strip()
return float(duration_str) if duration_str and duration_str != 'N/A' else 0.0
def build_video_filter(w, h, fps, force_resolution, pad_to_target):
"""构建视频滤镜"""
if force_resolution:
return f"scale={w}:{h},fps={fps}"
else:
scale_expr = f"scale='min({w},iw*min(1,{h}/ih))':'min({h},ih*min(1,{w}/iw))'"
if pad_to_target:
return f"{scale_expr},pad={w}:{h}:(ow-iw)/2:(oh-ih)/2,fps={fps}"
else:
return f"{scale_expr},fps={fps}"
def get_output_resolution(job_dir):
"""获取输出分辨率"""
first_frame = None
for f in sorted(os.listdir(job_dir)):
if f.endswith('.png'):
first_frame = os.path.join(job_dir, f)
break
if first_frame:
probe_res = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0',
first_frame
], capture_output=True, text=True, check=True)
out_w, out_h = map(int, probe_res.stdout.strip().split(','))
return out_w, out_h
return 0, 0
def has_audio(video_path):
"""检查视频是否有音频"""
probe_streams = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-show_entries', 'stream=codec_type',
'-of', 'csv=p=0',
video_path
], capture_output=True, text=True)
return 'audio' in probe_streams.stdout.lower()