| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533 |
- import json
- import os
- import re
- import hashlib
- import logging
- import shutil
- import subprocess
- from pathlib import Path
- import cv2
- import librosa
- import numpy as np
- logger = logging.getLogger(__name__)
- def get_video_info(video_path):
- """获取视频基础元数据"""
- cap = cv2.VideoCapture(video_path)
- if not cap.isOpened():
- return None
- width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
- height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
- fps = cap.get(cv2.CAP_PROP_FPS)
- frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
- duration = int(frame_count / fps) if fps > 0 else 0
- size_byte = int(os.path.getsize(video_path))
- # 判定横竖屏:1 为横屏, 0 为竖屏
- horizontal = 1 if width >= height else 0
- cap.release()
- return {
- "video_path": video_path,
- "duration": duration,
- "size_byte": size_byte,
- "width": width,
- "height": height,
- "horizontal": horizontal,
- "scenes": []
- }
- def get_media_info(media_path):
- """获取视频基础元数据(含音频)"""
- # 1. 使用 ffprobe 获取详细流信息
- cmd = [
- 'ffprobe', '-v', 'quiet', '-print_format', 'json',
- '-show_streams', '-show_format', media_path
- ]
- try:
- result = subprocess.check_output(cmd, encoding='utf-8')
- data = json.loads(result)
- except Exception as e:
- print(f"ffprobe 解析失败: {e}")
- return None
- # 提取视频流和音频流
- video_stream = next((s for s in data['streams'] if s['codec_type'] == 'video'), None)
- audio_stream = next((s for s in data['streams'] if s['codec_type'] == 'audio'), None)
- # 2. 基础视频信息
- width = int(video_stream.get('width', 0)) if video_stream else 0
- height = int(video_stream.get('height', 0)) if video_stream else 0
- duration = float(data['format'].get('duration', 0))
- size_byte = int(os.path.getsize(media_path))
- horizontal = 1 if width >= height else 0
- # 3. 构造返回结构
- info = {
- "media_path": media_path,
- "duration": round(duration, 2),
- "size_byte": size_byte,
- "width": width,
- "height": height,
- "horizontal": horizontal,
- # 新增音频字段
- "has_audio": audio_stream is not None,
- "audio_info": {
- "codec": audio_stream.get('codec_name'),
- "sample_rate": audio_stream.get('sample_rate'),
- "channels": audio_stream.get('channels'),
- "bit_rate": audio_stream.get('bit_rate')
- } if audio_stream else None,
- "scenes": []
- }
- return info
- def get_scene_times(video_path, threshold=0.3):
- cmd = [
- 'ffmpeg',
- '-hide_banner',
- '-i', video_path,
- '-threads', '0',
- '-vf', f"select='eq(n,0)+gt(scene,{threshold})',showinfo",
- '-vsync', 'vfr',
- '-f', 'null', '-' # 仅测试检测,不实际写文件;如需写文件请换回你的参数
- ]
- scene_start_times = []
- try:
- # 2. 启动子进程
- # stderr=subprocess.PIPE 捕获日志,stdout=subprocess.DEVNULL 忽略正常输出
- process = subprocess.Popen(
- cmd,
- stdout=subprocess.DEVNULL,
- stderr=subprocess.PIPE,
- universal_newlines=True,
- encoding='utf-8'
- )
- # 3. 实时解析日志
- # 使用 stdout/stderr 迭代时,建议处理编码或可能的读取中断
- try:
- # showinfo 的输出在 stderr
- for line in process.stderr:
- if "pts_time:" in line:
- match = re.search(r"pts_time:(\d+\.\d+)", line)
- if match:
- time_val = float(match.group(1))
- scene_start_times.append(time_val)
- logger.info(f"检测到新场景起始点: {time_val}s")
- except Exception as e:
- process.kill() # 如果读取过程崩溃,强制结束进程
- raise RuntimeError(f"读取 FFmpeg 输出时发生错误: {e}")
- # 4. 等待进程结束并检查退出码
- process.wait()
- if process.returncode != 0:
- # 获取最后几行错误信息(如果有)
- raise subprocess.CalledProcessError(process.returncode, cmd)
- except FileNotFoundError:
- # 当系统环境变量里找不到 'ffmpeg' 命令时触发
- raise RuntimeError("系统未安装 FFmpeg 或未将其添加到环境变量 PATH 中")
- except subprocess.CalledProcessError as e:
- # FFmpeg 执行过程中报错(如视频解码失败、参数错误)
- raise RuntimeError(f"FFmpeg 处理视频失败,退出码: {e.returncode}")
- except Exception as e:
- # 其他未知异常
- raise RuntimeError(f"发生未知错误: {e}")
- if not scene_start_times:
- return [0.0]
- if scene_start_times[0] > 0.5:
- # 手动把第一个点修正为 0.0
- scene_start_times.insert(0, 0.0)
- return scene_start_times
- def split_video_by_scenes(video_path, scene_start_times, output_dir="segment"):
- """
- 根据给定的起始时间列表分割视频
- """
- if not scene_start_times:
- logger.info("没有检测到场景,跳过分割。")
- return
- # 添加一个结束标识,方便循环计算时长
- # 这里不需要准确的视频总长,FFmpeg 处理最后一个片段时会自动截取到末尾
- times = scene_start_times + [None]
- for i in range(len(times) - 1):
- start_time = times[i]
- next_time = times[i + 1]
- output_file = f"{output_dir}/segment_{i:03d}.mp4"
- # 构建命令
- # -ss 放在 -i 前面可以实现快速定位(基于关键帧)
- cmd = [
- 'ffmpeg', '-hide_banner', '-y',
- '-ss', str(start_time),
- '-i', video_path
- ]
- # 如果不是最后一个片段,指定持续时间 -t
- if next_time is not None:
- duration = next_time - start_time
- cmd.extend(['-t', str(duration)])
- # 使用 copy 模式不重编码,速度极快
- cmd.extend(['-c', 'copy', output_file])
- try:
- subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
- logger.info(f"完成: {output_file} (起始点: {start_time}s)")
- except subprocess.CalledProcessError as e:
- logger.info(f"分割片段 {i} 失败: {e.stderr.decode()}")
- def calculate_mid_points(video_path, scene_start_times):
- """
- 计算每个场景的详细信息:
- 1. 获取视频总时长以确定最后一个场景的边界。
- 2. 如果场景时长 < 0.5s,抽帧点取起始点;否则取中点。
- 3. 返回格式化的字典列表。
- """
- # 1. 获取视频总时长
- duration_cmd = [
- 'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
- '-of', 'default=noprint_wrappers=1:nokey=1', video_path
- ]
- try:
- total_duration = float(subprocess.check_output(duration_cmd).decode().strip())
- except Exception as e:
- # 如果获取时长失败,可以根据需求抛出异常或设置一个保守值
- raise RuntimeError(f"无法获取视频时长: {e}")
- # 2. 构建结束时间点(下一个场景的开始即当前场景的结束)
- end_times = scene_start_times[1:] + [total_duration]
- scenes = []
- for start, end in zip(scene_start_times, end_times):
- duration = end - start
- # 3. 计算抽帧位置逻辑
- if duration < 0.5:
- mid_point = start
- else:
- mid_point = (start + end) / 2
- # 4. 组装成指定的字典格式
- scenes.append({
- "scene_start": round(start, 3),
- "scene_end": round(end, 3),
- "frame_pos": round(mid_point, 3)
- })
- return scenes
- def extract_frames(video_path, scenes, output_dir="thumbnails"):
- """
- 执行实际的 FFmpeg 抽帧操作。
- 输入: scenes 字典列表。
- 输出: 带有 'frame_path' 绝对路径的 scenes 字典列表。
- """
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- # 获取输出目录的绝对路径,确保返回的路径是完整的
- abs_output_dir = os.path.abspath(output_dir)
- logger.info(f"开始执行抽帧任务,目标数量: {len(scenes)}")
- for i, scene in enumerate(scenes):
- ts = scene["frame_pos"]
- # 文件命名保持之前的规范:序号_时间戳.jpg
- file_name = f"scene_{i + 1:03d}_{ts}s.jpg"
- output_file_path = os.path.join(abs_output_dir, file_name)
- # 使用快速定位 (-ss 在 -i 前)
- cmd = [
- 'ffmpeg', '-hide_banner', '-loglevel', 'error',
- '-ss', str(ts),
- '-i', video_path,
- '-frames:v', '1',
- '-q:v', '2',
- '-vf', 'scale=640:-1', # 预览图建议缩放,速度更快
- output_file_path, '-y'
- ]
- try:
- subprocess.run(cmd, check=True)
- # 抽帧成功后,将绝对路径存入字典
- scene["frame_path"] = output_file_path
- if (i + 1) % 5 == 0 or (i + 1) == len(scenes):
- logger.info(f"进度: {i + 1}/{len(scenes)}")
- except subprocess.CalledProcessError:
- logger.info(f"错误: 无法提取 {ts}s 处的帧")
- scene["frame_path"] = None # 如果提取失败,可以标记为 None
- logger.info(f"任务完成,存储路径: {abs_output_dir}")
- return scenes
- # subtitles 滤镜位于 filter_complex 的字符串内部,FFmpeg 会对其进行二次解析。如果路径包含 \、: 或空格,解析就会崩溃
- # 将 srt 文件临时改名为一个完全合法的名字并复制文件, 处理完成后再删除复制的文件
- def get_safe_temp_srt(srt_path):
- """
- 根据原始路径生成一个位于同目录下的 SHA256 临时文件名
- """
- srt_obj = Path(srt_path).resolve()
- # 计算路径或内容的 hash (建议计算路径的 hash 即可,速度快)
- path_hash = hashlib.sha256(str(srt_obj).encode('utf-8')).hexdigest()
- # 构造临时文件路径:与原文件同目录,名字为 hash.srt
- temp_srt_path = srt_obj.parent / f"{path_hash}.srt"
- return temp_srt_path
- def generate_video(audio_path, srt_path, video_output):
- # 预处理 srt 路径
- # 1. 转为绝对路径
- # 2. 统一使用正斜杠 /
- # 3. 处理 subtitles 滤镜特有的转义:将 ':' 替换为 '\:'
- temp_srt_path = get_safe_temp_srt(srt_path)
- shutil.copy(srt_path, temp_srt_path)
- font_name = 'WenQuanYi Micro Hei'
- font_size = 20
- font_color = '&H0000FFFF&'
- """调用 FFmpeg 合成视频"""
- # 建议设置:
- # -rc vbr: 使用可变码率模式
- # -cq 28: 控制质量。数值越大,体积越小。推荐范围 24-32
- # -b:v 0: 在 cq 模式下,将目标码率设为 0,让编码器完全根据质量控制
- command = [
- 'ffmpeg', '-y',
- '-hide_banner',
- '-i', audio_path,
- '-filter_complex',
- f"[0:a]showwaves=s=854x480:mode=line:colors=0x00FFFF[v];"
- f"[v]subtitles={temp_srt_path}:charenc=UTF-8:force_style='FontName={font_name},FontSize={font_size},PrimaryColour={font_color},Alignment=2'[v_out]",
- '-map', '[v_out]',
- '-map', '0:a',
- '-c:v', 'libx264', # 使用 CPU 编码压缩率更高
- '-preset', 'veryfast', # 编码速度预设。想要体积更小可以改为 'medium',但速度会慢一点
- '-crf', '28', # 质量控制:23 是默认,28 体积更小,对于 480p 波形图完全够用
- '-pix_fmt', 'yuv420p', # 增强兼容性,确保所有播放器都能看
- '-c:a', 'aac',
- '-b:a', '128k', # 音频码率限制在 128k
- '-shortest',
- video_output
- ]
- try:
- # 使用 Popen 启动进程,将 stderr 重定向到 PIPE
- # 注意:FFmpeg 的进度信息是在 stderr 输出的
- process = subprocess.Popen(
- command,
- stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT, # 将 stderr 合并到 stdout 统一处理
- text=True,
- encoding='utf-8',
- errors='replace'
- )
- # 用于记录最后几行日志,方便报错时排查
- error_log_buffer = []
- logger.info(f"🎬 开始合成视频: {video_output}")
- # 实时读取并打印输出内容
- while True:
- line = process.stdout.readline()
- if not line and process.poll() is not None:
- break
- if line:
- clean_line = line.strip()
- error_log_buffer.append(clean_line)
- # 保持缓冲区大小,只留最后 20 行
- if len(error_log_buffer) > 20:
- error_log_buffer.pop(0)
- # 如果是进度行,则在同一行刷新;如果是警告/错误,则换行打印
- if "frame=" in clean_line or "size=" in clean_line:
- # 这里的 line 会包含诸如 "frame= 123 fps= 30 size= 512kB time=00:00:05.12..." 的进度信息
- # 使用 end='' 是因为 readline 自带换行符
- logger.info(f"\rFFmpeg 进度: {clean_line}")
- # 检查最终退出状态
- process.wait()
- if process.returncode != 0:
- # 拼接最后的错误片段
- last_errors = "\n".join(error_log_buffer)
- raise RuntimeError(
- f"FFmpeg 执行失败 (退出码 {process.returncode})\n"
- f"--- 最后 20 行日志 ---\n{last_errors}"
- )
- logger.info(f"\n🚀 视频合成成功: {video_output}")
- finally:
- if temp_srt_path.exists():
- temp_srt_path.unlink()
- def get_precise_srt(text_list, timestamp_list, max_chars=20):
- total_ts = len(timestamp_list)
- raw_parts = text_list
- sentences = []
- # 合并标点到前面的短句
- for i in range(0, len(raw_parts) - 1, 2):
- sentences.append(raw_parts[i] + raw_parts[i + 1])
- if len(raw_parts) % 2 == 1:
- sentences.append(raw_parts[-1])
- ts_idx = 0
- line_count = 1
- srt_list = []
- for sentence in sentences:
- sentence = sentence.strip()
- if not sentence or ts_idx >= total_ts:
- continue
- # 2. 如果单句太长,进行硬切分(按 max_chars)
- sub_sentences = [sentence[i:i + max_chars] for i in range(0, len(sentence), max_chars)]
- for s in sub_sentences:
- # 统计这行里有多少个字符是对应时间戳的
- # 注意:Paraformer 的时间戳通常不包含标点,需要过滤掉标点再计数
- pure_words = re.sub(r'[^\w\u4e00-\u9fa5]', '', s) # 仅保留中文字符和字母数字
- num_words = len(pure_words)
- if num_words == 0:
- continue
- # --- 关键防护:检查 ts_idx 是否越界 ---
- if ts_idx >= total_ts:
- break
- # 获取开始时间
- start_t = timestamp_list[ts_idx][0]
- # 计算结束索引,确保不越界
- end_pos = ts_idx + num_words - 1
- if end_pos >= total_ts:
- end_pos = total_ts - 1
- end_t = timestamp_list[end_pos][1]
- # 写入 SRT 格式
- # f.write(f"{line_count}\n")
- # f.write(f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}\n")
- # f.write(f"{s}\n\n")
- srt_list.append({
- "line": line_count,
- "time": f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}",
- "text": s
- })
- # 更新索引
- ts_idx += num_words
- line_count += 1
- return srt_list
- def format_time_srt(ms):
- """毫秒转 SRT 格式: HH:MM:SS,mmm"""
- s, ms = divmod(ms, 1000)
- m, s = divmod(s, 60)
- h, m = divmod(m, 60)
- return f"{h:02}:{m:02}:{s:02},{int(ms):03}"
- def save_srt_file(srt_list, output_path):
- """
- 将 srt 列表写入文件
- :param srt_list: 包含 line, time, text 字典的列表
- :param output_path: 输出路径 (如 'output.srt')
- """
- with open(output_path, 'w', encoding='utf-8') as f:
- for entry in srt_list:
- # 1. 写入序号 (line)
- f.write(f"{entry['line']}\n")
- # 2. 写入时间轴 (time)
- f.write(f"{entry['time']}\n")
- # 3. 写入文本 (text)
- f.write(f"{entry['text']}\n")
- # 4. 写入一个空行作为分隔符
- f.write("\n")
- def parse_srt_to_list(file_path):
- """
- 将 SRT 文件还原为 [{line, time, text}, ...] 结构
- """
- with open(file_path, 'r', encoding='utf-8') as f:
- content = f.read().strip()
- # 正则表达式解释:
- # (\d+) -> 匹配序号 (line)
- # (\d{2}:\d{2}:.*) -> 匹配时间轴 (time)
- # ([\s\S]*?) -> 匹配文本内容 (text),支持多行
- # (?=\n\d+\n|\Z) -> 断言后面紧跟下一个序号或文件末尾
- pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\d+\n|\Z)')
- matches = pattern.findall(content)
- srt_list = []
- for m in matches:
- srt_list.append({
- "line": int(m[0]),
- "time": m[1],
- "text": m[2].strip() # 去掉文本末尾可能的换行
- })
- return srt_list
- def analyze_audio_energy(audio_path, segment_ms=100):
- """
- 按时间片段分析音频能量,帮助确定静音阈值
- :param audio_path: 音频文件路径
- :param segment_ms: 检查的时间块大小(毫秒)
- """
- # 1. 加载音频
- sr = 16000
- y, _ = librosa.load(audio_path, sr=sr)
- # 2. 计算每个片段的能量 (RMS)
- hop_length = int(sr * segment_ms / 1000)
- energy_list = []
- logger.info(f"{'时间 (秒)':<10} | {'能量值 (RMS)':<15} | {'状态估计'}")
- logger.info("-" * 45)
- for i in range(0, len(y), hop_length):
- segment = y[i: i + hop_length]
- if len(segment) == 0: break
- rms = np.sqrt(np.mean(segment ** 2))
- energy_list.append(rms)
- # 打印进度和数值
- time_sec = i / sr
- status = "🤫 静音" if rms < 0.005 else "🗣️ 有声"
- logger.info(f"{time_sec:>8.2f}s | {rms:>15.6f} | {status}")
- # 3. 输出统计建议
- logger.info("-" * 45)
- logger.info(f"最大能量: {max(energy_list):.6f}")
- logger.info(f"最小能量: {min(energy_list):.6f}")
- logger.info(f"建议阈值: {np.percentile(energy_list, 20):.6f} (取前20%分位数作为参考)")
|