Compare commits

...

2 Commits

Author SHA1 Message Date
nnwang
f422eb32c1 Merge branch 'main' of https://git.liulikeji.cn/xingluo/GMapiServer 2026-01-09 21:40:31 +08:00
nnwang
cad3a033a3 添加视频切片功能 2026-01-09 21:40:03 +08:00
7 changed files with 156248 additions and 22 deletions

1
.gitignore vendored
View File

@@ -2,3 +2,4 @@
__pycache__/
app.log
temp_files
frames

View File

@@ -6,28 +6,50 @@ import logging
from shared_utils import file_registry, file_lock, task_registry, task_lock
UPLOAD_FOLDER = 'temp_files'
FRAMES_FOLDER = 'frames' # 视频帧提取任务文件夹
CLEANUP_INTERVAL = 3600 # 清理临时文件的间隔(秒)
FILE_EXPIRY = 7200 # 文件过期时间(秒)
TASK_EXPIRY = 7200 # 任务记录过期时间(秒)
def cleanup_temp_files():
current_time = time.time()
expired_folders = []
for folder_name in os.listdir(UPLOAD_FOLDER):
folder_path = os.path.join(UPLOAD_FOLDER, folder_name)
# 清理temp_files目录
if os.path.exists(UPLOAD_FOLDER):
for folder_name in os.listdir(UPLOAD_FOLDER):
folder_path = os.path.join(UPLOAD_FOLDER, folder_name)
if not os.path.isdir(folder_path):
continue
if not os.path.isdir(folder_path):
continue
try:
folder_mtime = os.path.getmtime(folder_path)
try:
folder_mtime = os.path.getmtime(folder_path)
if current_time - folder_mtime > FILE_EXPIRY:
expired_folders.append(folder_path)
logging.info(f"检测到过期文件夹: {folder_path} (修改时间: {time.ctime(folder_mtime)})")
except Exception as e:
logging.error(f"获取文件夹 {folder_path} 信息失败: {e}")
if current_time - folder_mtime > FILE_EXPIRY:
expired_folders.append(folder_path)
logging.info(f"检测到过期文件夹: {folder_path} (修改时间: {time.ctime(folder_mtime)})")
except Exception as e:
logging.error(f"获取文件夹 {folder_path} 信息失败: {e}")
# 清理frames目录
if os.path.exists(FRAMES_FOLDER):
for folder_name in os.listdir(FRAMES_FOLDER):
folder_path = os.path.join(FRAMES_FOLDER, folder_name)
if not os.path.isdir(folder_path):
continue
try:
folder_mtime = os.path.getmtime(folder_path)
if current_time - folder_mtime > FILE_EXPIRY:
expired_folders.append(folder_path)
logging.info(f"检测到过期视频帧文件夹: {folder_path} (修改时间: {time.ctime(folder_mtime)})")
except Exception as e:
logging.error(f"获取视频帧文件夹 {folder_path} 信息失败: {e}")
# 删除所有过期文件夹
for folder_path in expired_folders:
try:
shutil.rmtree(folder_path)
@@ -37,17 +59,35 @@ def cleanup_temp_files():
logging.info(f"清理完成,共删除 {len(expired_folders)} 个过期文件夹")
# 同时清理过期的任务记录超过2小时
# 清理过期的任务记录和文件记录
cleanup_expired_registries(current_time)
def cleanup_expired_registries(current_time):
"""清理过期的任务记录和文件记录"""
# 清理过期的任务记录
with task_lock:
expired_tasks = []
for task_id, task_info in list(task_registry.items()):
if current_time - task_info['create_time'] > FILE_EXPIRY:
if current_time - task_info['create_time'] > TASK_EXPIRY:
expired_tasks.append(task_id)
for task_id in expired_tasks:
del task_registry[task_id]
logging.info(f"已清理过期任务记录: {task_id}")
# 清理过期的文件记录(基于最后访问时间)
with file_lock:
expired_files = []
for file_id, file_info in list(file_registry.items()):
if current_time - file_info['last_access'] > FILE_EXPIRY:
expired_files.append(file_id)
for file_id in expired_files:
del file_registry[file_id]
logging.info(f"已清理过期文件记录: {file_id}")
logging.info(f"共清理 {len(expired_tasks)} 个任务记录和 {len(expired_files)} 个文件记录")
def start_cleanup_thread():
def cleanup_loop():
while True:

82
main.py
View File

@@ -15,6 +15,7 @@ app = Flask(__name__)
try:
from ffmpeg_utils import process_ffmpeg
from sanjuuni_utils import process_sanjuuni
from video_frame_utils import process_video_frame_extraction
from file_cleanup import start_cleanup_thread
except ImportError as e:
logging.error(f"导入模块时出错: {e}")
@@ -23,6 +24,8 @@ except ImportError as e:
return {'error': 'FFmpeg模块未正确导入'}
def process_sanjuuni(*args, **kwargs):
return {'error': 'Sanjuuni模块未正确导入'}
def process_video_frame_extraction(*args, **kwargs):
return {'error': '视频帧提取模块未正确导入'}
def start_cleanup_thread():
pass
@@ -37,6 +40,14 @@ enter_parameter_table = {
"input_url": str,
"output_format": str,
"args": list
},
"video_frame": {
"video_url": str,
"w": int,
"h": int,
"fps": int,
"force_resolution": bool,
"pad_to_target": bool
}
}
@@ -174,6 +185,53 @@ def sanjuuni_async_api():
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/video_frame/async', methods=['POST'])
def video_frame_async_api():
"""创建异步视频帧提取任务"""
logging.info("收到异步视频帧提取API请求")
data = request.get_json()
# 检测参数类型
error_response, status_code = validate_request(data, "video_frame")
if error_response:
return error_response, status_code
# 生成任务ID
task_id = str(uuid.uuid4())[:8]
# 获取请求的主机信息用于构建URL
host = request.host
scheme = request.scheme
# 初始化任务状态
with task_lock:
task_registry[task_id] = {
'status': 'pending',
'type': 'video_frame',
'create_time': time.time(),
'logs': [],
'last_returned_index': 0, # 记录最后返回的日志索引
'progress': 0
}
# 启动异步任务
thread = threading.Thread(
target=run_async_task,
args=(task_id, process_video_frame_extraction, data)
)
thread.daemon = True
thread.start()
status_url = f"{scheme}://{host}/api/task/{task_id}"
logging.info(f"创建异步视频帧提取任务: {task_id}")
return jsonify({
'status': 'success',
'task_id': task_id,
'status_url': status_url,
'message': '任务已创建请使用状态URL查询进度'
}), 202
@app.route('/api/task/<task_id>', methods=['GET'])
def get_task_status(task_id):
"""查询任务状态和进度(自动返回新增日志)"""
@@ -307,6 +365,30 @@ def download_file_endpoint(file_id, filename):
logging.error(f"下载文件时出错: {e}")
return jsonify({'status': 'error', 'error': str(e)}), 500
@app.route('/frames/<job_id>/<filename>', methods=['GET'])
def serve_video_frames(job_id, filename):
"""提供视频帧和音频文件访问"""
import os
from flask import send_from_directory
from video_frame_utils import FRAMES_ROOT
safe_job = os.path.basename(job_id)
safe_file = os.path.basename(filename)
dir_path = os.path.join(FRAMES_ROOT, safe_job)
if not os.path.isdir(dir_path):
return jsonify({"error": "文件不存在"}), 404
allowed = safe_file.endswith(('.png', '.dfpwm'))
if not allowed:
return jsonify({"error": "文件类型不支持"}), 400
try:
return send_from_directory(dir_path, safe_file)
except Exception as e:
logging.error(f"提供文件时出错: {e}")
return jsonify({"error": "文件访问失败"}), 500
@app.route('/health', methods=['GET'])
def health_check():
"""健康检查接口,直接返回 'ok'"""

View File

@@ -1,2 +1,3 @@
Flask==3.1.0
Requests==2.32.3
yt-dlp

155640
server.log Normal file

File diff suppressed because it is too large Load Diff

68
start_server.py Normal file
View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
"""
启动GMapiServer的启动脚本
包含所有功能FFmpeg处理、Sanjuuni处理、视频帧提取
"""
import os
import sys
import logging
from main import app
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout),
logging.FileHandler('server.log')
]
)
def main():
print("🚀 GMapiServer 启动中...")
print("=" * 50)
print("📋 可用功能:")
print(" • FFmpeg媒体处理 (同步/异步)")
print(" • Sanjuuni工具处理 (同步/异步)")
print(" • 视频帧提取 (异步,支持B站BV号)")
print()
print("🔧 配置信息:")
print(f" 端口: 5000")
print(f" 临时文件目录: temp_files/")
print(f" 视频帧目录: frames/")
print(f" 清理间隔: 1小时")
print()
print("🔗 API 端点:")
print(" 同步接口:")
print(" POST /api/ffmpeg - FFmpeg处理")
print(" POST /api/sanjuuni - Sanjuuni处理")
print()
print(" 异步接口:")
print(" POST /api/ffmpeg/async - 异步FFmpeg处理")
print(" POST /api/sanjuuni/async - 异步Sanjuuni处理")
print(" POST /api/video_frame/async - 异步视频帧提取")
print(" GET /api/task/<task_id> - 查询异步任务状态")
print(" GET /api/tasks - 所有任务列表")
print()
print(" 文件下载:")
print(" GET /download/<file_id>/<filename> - 下载处理后的文件")
print(" GET /frames/<job_id>/<filename> - 下载视频帧/音频文件")
print()
print(" 其他:")
print(" GET /health - 健康检查")
print(" GET /api/tasks - 任务列表(调试)")
print("=" * 50)
print()
print("🌐 服务器地址: http://0.0.0.0:5000")
print("🔄 自动清理机制已启用 - 临时文件将在2小时后自动删除")
print()
# 启动Flask应用
from file_cleanup import start_cleanup_thread
start_cleanup_thread()
app.run(host='0.0.0.0', port=5000, debug=False, threaded=True)
if __name__ == '__main__':
main()

394
video_frame_utils.py Normal file
View File

@@ -0,0 +1,394 @@
import os
import sys
import uuid
import subprocess
import tempfile
import threading
import time
from datetime import datetime, timedelta
import json
import glob
import requests
import logging
from shared_utils import task_registry, file_registry, file_lock, task_lock, add_task_log
# ======================
# 配置
# ======================
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
FRAMES_ROOT = os.path.abspath("./frames")
TASK_TIMEOUT_HOURS = 1 # 任务过期时间(小时)
# 确保frames目录存在
os.makedirs(FRAMES_ROOT, exist_ok=True)
if sys.platform.startswith('win'):
FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg.exe')
FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe.exe')
else:
FFMPEG_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffmpeg')
FFPROBE_PATH = os.path.join(BASE_DIR, 'lib', 'ffmpeg', 'bin', 'ffprobe')
def log_subprocess_output(pipe, task_id, task_registry, task_lock, prefix=""):
"""从管道实时读取并记录日志"""
if not pipe:
return
try:
for line in iter(pipe.readline, ''):
if line:
clean_line = line.strip()
if clean_line and task_id and task_registry and task_lock:
add_task_log(task_id, f"{prefix}{clean_line}", task_registry, task_lock)
except Exception as e:
if task_id and task_registry and task_lock:
add_task_log(task_id, f"[日志读取错误] {e}", task_registry, task_lock)
finally:
pipe.close()
# ======================
# 任务处理函数
# ======================
def process_video_frame_extraction(data, file_registry, file_lock, task_id=None, task_registry=None, task_lock=None):
"""处理视频帧提取任务"""
try:
if task_id and task_registry and task_lock:
add_task_log(task_id, "开始处理视频帧提取任务", task_registry, task_lock)
logging.info(f"开始处理视频帧提取任务: {task_id}")
# 提取参数
video_url = data.get('video_url')
video_bv = data.get('video_bv') # 支持BV号
w = data.get('w')
h = data.get('h')
fps = data.get('fps', 30)
force_resolution = data.get('force_resolution', False)
pad_to_target = data.get('pad_to_target', False)
# 验证参数
if not video_url and not video_bv:
raise Exception("缺少必要参数: video_url 或 video_bv")
if w is None or h is None:
raise Exception("缺少必要参数: w, h")
try:
w = int(w)
h = int(h)
fps = float(fps)
if w <= 0 or h <= 0 or fps <= 0:
raise ValueError("参数必须为正数")
except (ValueError, TypeError) as e:
raise Exception(f"参数无效: {str(e)}")
# 如果提供了BV号自动生成Bilibili URL
if not video_url and video_bv:
video_bv = str(video_bv).strip()
if not video_bv.startswith(('BV', 'bv')):
raise Exception("video_bv 必须以 BV 或 bv 开头")
video_url = f"https://www.bilibili.com/video/{video_bv.upper()}"
# 准备目录
job_dir = os.path.join(FRAMES_ROOT, task_id)
os.makedirs(job_dir, exist_ok=True)
if task_id and task_registry and task_lock:
add_task_log(task_id, f"创建任务目录: {job_dir}", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 10
# 下载视频
temp_base = os.path.join(tempfile.gettempdir(), task_id)
# === 替换 yt-dlp 下载部分 ===
if task_id and task_registry and task_lock:
add_task_log(task_id, f"开始下载视频: {video_url}", task_registry, task_lock)
yt_dlp_cmd = [
sys.executable, '-m', 'yt_dlp',
video_url,
'-o', temp_base,
'-f', 'bv*[height<=720]+ba/b',
'--no-warnings',
'--progress', # 显式启用进度(可选)
]
# 使用 Popen 实时捕获 stderr
proc = subprocess.Popen(
yt_dlp_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# 启动日志线程(只读 stderr因为 yt-dlp 进度在 stderr
stderr_thread = threading.Thread(
target=log_subprocess_output,
args=(proc.stderr, task_id, task_registry, task_lock, "[yt-dlp] "),
daemon=True
)
stderr_thread.start()
try:
proc.wait(timeout=600)
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, yt_dlp_cmd)
except subprocess.TimeoutExpired:
proc.kill()
raise Exception("yt-dlp 下载超时超过10分钟")
finally:
stderr_thread.join(timeout=5) # 等待日志线程结束
# 查找实际生成的文件
candidates = glob.glob(temp_base + ".*")
if not candidates:
raise Exception("yt-dlp 执行成功但未生成任何视频文件")
# 按修改时间取最新
temp_video = max(candidates, key=os.path.getmtime)
if task_id and task_registry and task_lock:
add_task_log(task_id, "下载完成,开始获取视频信息", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 40
# 获取视频信息
duration = get_video_duration(temp_video)
# 构建滤镜
vf = build_video_filter(w, h, fps, force_resolution, pad_to_target)
# 提取帧
if task_id and task_registry and task_lock:
add_task_log(task_id, "开始提取视频帧", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 50
frame_pattern = os.path.join(job_dir, "frame_%06d.png")
ffmpeg_cmd = [
FFMPEG_PATH, '-y',
'-i', temp_video,
'-vf', vf,
'-pix_fmt', 'rgb24',
'-stats', # 关键:启用进度统计
'-v', 'info', # 确保输出级别足够
frame_pattern
]
proc = subprocess.Popen(
ffmpeg_cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True,
bufsize=1,
universal_newlines=True
)
# FFmpeg 的进度在 stderr
stderr_thread = threading.Thread(
target=log_subprocess_output,
args=(proc.stderr, task_id, task_registry, task_lock, "[FFmpeg] "),
daemon=True
)
stderr_thread.start()
try:
proc.wait()
if proc.returncode != 0:
raise subprocess.CalledProcessError(proc.returncode, ffmpeg_cmd)
finally:
stderr_thread.join(timeout=5)
# 获取输出分辨率
if task_id and task_registry and task_lock:
add_task_log(task_id, "分析输出结果", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 80
out_w, out_h = get_output_resolution(job_dir)
total_frames = len([f for f in os.listdir(job_dir) if f.endswith('.png')])
# 检查音频
audio_exists = has_audio(temp_video)
if audio_exists:
if task_id and task_registry and task_lock:
add_task_log(task_id, "提取音频(左/右/混合声道)", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 90
# 公共参数
dfpwm_args = [
FFMPEG_PATH, '-y',
'-i', temp_video,
'-vn',
'-ar', '48000',
'-ac', '1',
'-f', 'dfpwm'
]
# 混合声道
dfpwm_path_mix = os.path.join(job_dir, "audio.dfpwm")
subprocess.run(
dfpwm_args + [dfpwm_path_mix],
check=True, capture_output=True
)
# 左声道
dfpwm_path_left = os.path.join(job_dir, "audio_left.dfpwm")
subprocess.run(
dfpwm_args + ['-af', 'pan=mono|c0=c0'] + [dfpwm_path_left],
check=True, capture_output=True
)
# 右声道
dfpwm_path_right = os.path.join(job_dir, "audio_right.dfpwm")
subprocess.run(
dfpwm_args + ['-af', 'pan=mono|c0=c1'] + [dfpwm_path_right],
check=True, capture_output=True
)
# 生成结果
result_data = {
"duration_seconds": round(duration, 3),
"total_frames": total_frames,
"fps": fps,
"output_resolution": {"w": out_w, "h": out_h},
"frame_urls": [f"/frames/{task_id}/frame_{i:06d}.png" for i in range(1, total_frames + 1)],
"audio_dfpwm_url": f"/frames/{task_id}/audio.dfpwm" if has_audio else None,
"audio_dfpwm_left_url": f"/frames/{task_id}/audio_left.dfpwm" if has_audio else None,
"audio_dfpwm_right_url": f"/frames/{task_id}/audio_right.dfpwm" if has_audio else None
}
# 注册文件到文件注册表
with file_lock:
for frame_file in glob.glob(os.path.join(job_dir, "frame_*.png")):
filename = os.path.basename(frame_file)
file_id = f"frame_{task_id}_{filename}"
file_registry[file_id] = {
'path': os.path.abspath(frame_file),
'filename': filename,
'last_access': time.time(),
'download_count': 0
}
if audio_exists:
for audio_file in ["audio.dfpwm", "audio_left.dfpwm", "audio_right.dfpwm"]:
audio_path = os.path.join(job_dir, audio_file)
if os.path.exists(audio_path):
file_id = f"audio_{task_id}_{audio_file}"
file_registry[file_id] = {
'path': os.path.abspath(audio_path),
'filename': audio_file,
'last_access': time.time(),
'download_count': 0
}
if task_id and task_registry and task_lock:
add_task_log(task_id, "视频帧提取任务完成", task_registry, task_lock)
with task_lock:
if task_id in task_registry:
task_registry[task_id]['progress'] = 100
return {
'status': 'success',
'result': result_data,
'task_id': task_id,
'temp_dir': job_dir
}
except subprocess.CalledProcessError as e:
stderr_str = e.stderr.strip() if e.stderr else ""
stdout_str = e.stdout.strip() if e.stdout else ""
error_output = stderr_str or stdout_str or str(e)
error_msg = error_output[-500:] if len(error_output) > 500 else error_output
if task_id and task_registry and task_lock:
add_task_log(task_id, f"FFmpeg处理失败: {error_msg}", task_registry, task_lock)
return {'error': f'FFmpeg处理失败: {error_msg}'}
except requests.RequestException as e:
error_msg = f'视频下载失败: {str(e)}'
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
return {'error': error_msg}
except Exception as e:
error_msg = f'处理失败: {str(e)}'
if task_id and task_registry and task_lock:
add_task_log(task_id, error_msg, task_registry, task_lock)
return {'error': error_msg}
finally:
# 清理临时视频文件
for f in glob.glob(os.path.join(tempfile.gettempdir(), f"{task_id}.*")):
try:
os.remove(f)
except:
pass
# ======================
# 辅助函数
# ======================
def get_video_duration(video_path):
"""获取视频时长"""
result = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-show_entries', 'format=duration',
'-of', 'csv=p=0',
video_path
], capture_output=True, text=True, check=True)
duration_str = result.stdout.strip()
return float(duration_str) if duration_str and duration_str != 'N/A' else 0.0
def build_video_filter(w, h, fps, force_resolution, pad_to_target):
"""构建视频滤镜"""
if force_resolution:
return f"scale={w}:{h},fps={fps}"
else:
scale_expr = f"scale='min({w},iw*min(1,{h}/ih))':'min({h},ih*min(1,{w}/iw))'"
if pad_to_target:
return f"{scale_expr},pad={w}:{h}:(ow-iw)/2:(oh-ih)/2,fps={fps}"
else:
return f"{scale_expr},fps={fps}"
def get_output_resolution(job_dir):
"""获取输出分辨率"""
first_frame = None
for f in sorted(os.listdir(job_dir)):
if f.endswith('.png'):
first_frame = os.path.join(job_dir, f)
break
if first_frame:
probe_res = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-select_streams', 'v:0',
'-show_entries', 'stream=width,height',
'-of', 'csv=p=0',
first_frame
], capture_output=True, text=True, check=True)
out_w, out_h = map(int, probe_res.stdout.strip().split(','))
return out_w, out_h
return 0, 0
def has_audio(video_path):
"""检查视频是否有音频"""
probe_streams = subprocess.run([
FFPROBE_PATH, '-v', 'error',
'-show_entries', 'stream=codec_type',
'-of', 'csv=p=0',
video_path
], capture_output=True, text=True)
return 'audio' in probe_streams.stdout.lower()