import json import os import re import hashlib import logging import shutil import subprocess from pathlib import Path import cv2 import librosa import numpy as np logger = logging.getLogger(__name__) def get_video_info(video_path): """获取视频基础元数据""" cap = cv2.VideoCapture(video_path) if not cap.isOpened(): return None width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) fps = cap.get(cv2.CAP_PROP_FPS) frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) duration = int(frame_count / fps) if fps > 0 else 0 size_byte = int(os.path.getsize(video_path)) # 判定横竖屏:1 为横屏, 0 为竖屏 horizontal = 1 if width >= height else 0 cap.release() return { "video_path": video_path, "duration": duration, "size_byte": size_byte, "width": width, "height": height, "horizontal": horizontal, "scenes": [] } def get_media_info(media_path): """获取视频基础元数据(含音频)""" # 1. 使用 ffprobe 获取详细流信息 cmd = [ 'ffprobe', '-v', 'quiet', '-print_format', 'json', '-show_streams', '-show_format', media_path ] try: result = subprocess.check_output(cmd, encoding='utf-8') data = json.loads(result) except Exception as e: print(f"ffprobe 解析失败: {e}") return None # 提取视频流和音频流 video_stream = next((s for s in data['streams'] if s['codec_type'] == 'video'), None) audio_stream = next((s for s in data['streams'] if s['codec_type'] == 'audio'), None) # 2. 基础视频信息 width = int(video_stream.get('width', 0)) if video_stream else 0 height = int(video_stream.get('height', 0)) if video_stream else 0 duration = float(data['format'].get('duration', 0)) size_byte = int(os.path.getsize(media_path)) horizontal = 1 if width >= height else 0 # 3. 构造返回结构 info = { "media_path": media_path, "duration": round(duration, 2), "size_byte": size_byte, "width": width, "height": height, "horizontal": horizontal, # 新增音频字段 "has_audio": audio_stream is not None, "audio_info": { "codec": audio_stream.get('codec_name'), "sample_rate": audio_stream.get('sample_rate'), "channels": audio_stream.get('channels'), "bit_rate": audio_stream.get('bit_rate') } if audio_stream else None, "scenes": [] } return info def get_scene_times(video_path, threshold=0.3): cmd = [ 'ffmpeg', '-hide_banner', '-i', video_path, '-threads', '0', '-vf', f"select='eq(n,0)+gt(scene,{threshold})',showinfo", '-vsync', 'vfr', '-f', 'null', '-' # 仅测试检测,不实际写文件;如需写文件请换回你的参数 ] scene_start_times = [] try: # 2. 启动子进程 # stderr=subprocess.PIPE 捕获日志,stdout=subprocess.DEVNULL 忽略正常输出 process = subprocess.Popen( cmd, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE, universal_newlines=True, encoding='utf-8' ) # 3. 实时解析日志 # 使用 stdout/stderr 迭代时,建议处理编码或可能的读取中断 try: # showinfo 的输出在 stderr for line in process.stderr: if "pts_time:" in line: match = re.search(r"pts_time:(\d+\.\d+)", line) if match: time_val = float(match.group(1)) scene_start_times.append(time_val) logger.info(f"检测到新场景起始点: {time_val}s") except Exception as e: process.kill() # 如果读取过程崩溃,强制结束进程 raise RuntimeError(f"读取 FFmpeg 输出时发生错误: {e}") # 4. 等待进程结束并检查退出码 process.wait() if process.returncode != 0: # 获取最后几行错误信息(如果有) raise subprocess.CalledProcessError(process.returncode, cmd) except FileNotFoundError: # 当系统环境变量里找不到 'ffmpeg' 命令时触发 raise RuntimeError("系统未安装 FFmpeg 或未将其添加到环境变量 PATH 中") except subprocess.CalledProcessError as e: # FFmpeg 执行过程中报错(如视频解码失败、参数错误) raise RuntimeError(f"FFmpeg 处理视频失败,退出码: {e.returncode}") except Exception as e: # 其他未知异常 raise RuntimeError(f"发生未知错误: {e}") if not scene_start_times: return [0.0] if scene_start_times[0] > 0.5: # 手动把第一个点修正为 0.0 scene_start_times.insert(0, 0.0) return scene_start_times def split_video_by_scenes(video_path, scene_start_times, output_dir="segment"): """ 根据给定的起始时间列表分割视频 """ if not scene_start_times: logger.info("没有检测到场景,跳过分割。") return # 添加一个结束标识,方便循环计算时长 # 这里不需要准确的视频总长,FFmpeg 处理最后一个片段时会自动截取到末尾 times = scene_start_times + [None] for i in range(len(times) - 1): start_time = times[i] next_time = times[i + 1] output_file = f"{output_dir}/segment_{i:03d}.mp4" # 构建命令 # -ss 放在 -i 前面可以实现快速定位(基于关键帧) cmd = [ 'ffmpeg', '-hide_banner', '-y', '-ss', str(start_time), '-i', video_path ] # 如果不是最后一个片段,指定持续时间 -t if next_time is not None: duration = next_time - start_time cmd.extend(['-t', str(duration)]) # 使用 copy 模式不重编码,速度极快 cmd.extend(['-c', 'copy', output_file]) try: subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE) logger.info(f"完成: {output_file} (起始点: {start_time}s)") except subprocess.CalledProcessError as e: logger.info(f"分割片段 {i} 失败: {e.stderr.decode()}") def calculate_mid_points(video_path, scene_start_times): """ 计算每个场景的详细信息: 1. 获取视频总时长以确定最后一个场景的边界。 2. 如果场景时长 < 0.5s,抽帧点取起始点;否则取中点。 3. 返回格式化的字典列表。 """ # 1. 获取视频总时长 duration_cmd = [ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration', '-of', 'default=noprint_wrappers=1:nokey=1', video_path ] try: total_duration = float(subprocess.check_output(duration_cmd).decode().strip()) except Exception as e: # 如果获取时长失败,可以根据需求抛出异常或设置一个保守值 raise RuntimeError(f"无法获取视频时长: {e}") # 2. 构建结束时间点(下一个场景的开始即当前场景的结束) end_times = scene_start_times[1:] + [total_duration] scenes = [] for start, end in zip(scene_start_times, end_times): duration = end - start # 3. 计算抽帧位置逻辑 if duration < 0.5: mid_point = start else: mid_point = (start + end) / 2 # 4. 组装成指定的字典格式 scenes.append({ "scene_start": round(start, 3), "scene_end": round(end, 3), "frame_pos": round(mid_point, 3) }) return scenes def extract_frames(video_path, scenes, output_dir="thumbnails"): """ 执行实际的 FFmpeg 抽帧操作。 输入: scenes 字典列表。 输出: 带有 'frame_path' 绝对路径的 scenes 字典列表。 """ if not os.path.exists(output_dir): os.makedirs(output_dir) # 获取输出目录的绝对路径,确保返回的路径是完整的 abs_output_dir = os.path.abspath(output_dir) logger.info(f"开始执行抽帧任务,目标数量: {len(scenes)}") for i, scene in enumerate(scenes): ts = scene["frame_pos"] # 文件命名保持之前的规范:序号_时间戳.jpg file_name = f"scene_{i + 1:03d}_{ts}s.jpg" output_file_path = os.path.join(abs_output_dir, file_name) # 使用快速定位 (-ss 在 -i 前) cmd = [ 'ffmpeg', '-hide_banner', '-loglevel', 'error', '-ss', str(ts), '-i', video_path, '-frames:v', '1', '-q:v', '2', '-vf', 'scale=640:-1', # 预览图建议缩放,速度更快 output_file_path, '-y' ] try: subprocess.run(cmd, check=True) # 抽帧成功后,将绝对路径存入字典 scene["frame_path"] = output_file_path if (i + 1) % 5 == 0 or (i + 1) == len(scenes): logger.info(f"进度: {i + 1}/{len(scenes)}") except subprocess.CalledProcessError: logger.info(f"错误: 无法提取 {ts}s 处的帧") scene["frame_path"] = None # 如果提取失败,可以标记为 None logger.info(f"任务完成,存储路径: {abs_output_dir}") return scenes # subtitles 滤镜位于 filter_complex 的字符串内部,FFmpeg 会对其进行二次解析。如果路径包含 \、: 或空格,解析就会崩溃 # 将 srt 文件临时改名为一个完全合法的名字并复制文件, 处理完成后再删除复制的文件 def get_safe_temp_srt(srt_path): """ 根据原始路径生成一个位于同目录下的 SHA256 临时文件名 """ srt_obj = Path(srt_path).resolve() # 计算路径或内容的 hash (建议计算路径的 hash 即可,速度快) path_hash = hashlib.sha256(str(srt_obj).encode('utf-8')).hexdigest() # 构造临时文件路径:与原文件同目录,名字为 hash.srt temp_srt_path = srt_obj.parent / f"{path_hash}.srt" return temp_srt_path def generate_video(audio_path, srt_path, video_output): # 预处理 srt 路径 # 1. 转为绝对路径 # 2. 统一使用正斜杠 / # 3. 处理 subtitles 滤镜特有的转义:将 ':' 替换为 '\:' temp_srt_path = get_safe_temp_srt(srt_path) shutil.copy(srt_path, temp_srt_path) font_name = 'WenQuanYi Micro Hei' font_size = 20 font_color = '&H0000FFFF&' """调用 FFmpeg 合成视频""" # 建议设置: # -rc vbr: 使用可变码率模式 # -cq 28: 控制质量。数值越大,体积越小。推荐范围 24-32 # -b:v 0: 在 cq 模式下,将目标码率设为 0,让编码器完全根据质量控制 command = [ 'ffmpeg', '-y', '-hide_banner', '-i', audio_path, '-filter_complex', f"[0:a]showwaves=s=854x480:mode=line:colors=0x00FFFF[v];" f"[v]subtitles={temp_srt_path}:charenc=UTF-8:force_style='FontName={font_name},FontSize={font_size},PrimaryColour={font_color},Alignment=2'[v_out]", '-map', '[v_out]', '-map', '0:a', '-c:v', 'libx264', # 使用 CPU 编码压缩率更高 '-preset', 'veryfast', # 编码速度预设。想要体积更小可以改为 'medium',但速度会慢一点 '-crf', '28', # 质量控制:23 是默认,28 体积更小,对于 480p 波形图完全够用 '-pix_fmt', 'yuv420p', # 增强兼容性,确保所有播放器都能看 '-c:a', 'aac', '-b:a', '128k', # 音频码率限制在 128k '-shortest', video_output ] try: # 使用 Popen 启动进程,将 stderr 重定向到 PIPE # 注意:FFmpeg 的进度信息是在 stderr 输出的 process = subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, # 将 stderr 合并到 stdout 统一处理 text=True, encoding='utf-8', errors='replace' ) # 用于记录最后几行日志,方便报错时排查 error_log_buffer = [] logger.info(f"🎬 开始合成视频: {video_output}") # 实时读取并打印输出内容 while True: line = process.stdout.readline() if not line and process.poll() is not None: break if line: clean_line = line.strip() error_log_buffer.append(clean_line) # 保持缓冲区大小,只留最后 20 行 if len(error_log_buffer) > 20: error_log_buffer.pop(0) # 如果是进度行,则在同一行刷新;如果是警告/错误,则换行打印 if "frame=" in clean_line or "size=" in clean_line: # 这里的 line 会包含诸如 "frame= 123 fps= 30 size= 512kB time=00:00:05.12..." 的进度信息 # 使用 end='' 是因为 readline 自带换行符 logger.info(f"\rFFmpeg 进度: {clean_line}") # 检查最终退出状态 process.wait() if process.returncode != 0: # 拼接最后的错误片段 last_errors = "\n".join(error_log_buffer) raise RuntimeError( f"FFmpeg 执行失败 (退出码 {process.returncode})\n" f"--- 最后 20 行日志 ---\n{last_errors}" ) logger.info(f"\n🚀 视频合成成功: {video_output}") finally: if temp_srt_path.exists(): temp_srt_path.unlink() def get_precise_srt(text_list, timestamp_list, max_chars=20): total_ts = len(timestamp_list) raw_parts = text_list sentences = [] # 合并标点到前面的短句 for i in range(0, len(raw_parts) - 1, 2): sentences.append(raw_parts[i] + raw_parts[i + 1]) if len(raw_parts) % 2 == 1: sentences.append(raw_parts[-1]) ts_idx = 0 line_count = 1 srt_list = [] for sentence in sentences: sentence = sentence.strip() if not sentence or ts_idx >= total_ts: continue # 2. 如果单句太长,进行硬切分(按 max_chars) sub_sentences = [sentence[i:i + max_chars] for i in range(0, len(sentence), max_chars)] for s in sub_sentences: # 统计这行里有多少个字符是对应时间戳的 # 注意:Paraformer 的时间戳通常不包含标点,需要过滤掉标点再计数 pure_words = re.sub(r'[^\w\u4e00-\u9fa5]', '', s) # 仅保留中文字符和字母数字 num_words = len(pure_words) if num_words == 0: continue # --- 关键防护:检查 ts_idx 是否越界 --- if ts_idx >= total_ts: break # 获取开始时间 start_t = timestamp_list[ts_idx][0] # 计算结束索引,确保不越界 end_pos = ts_idx + num_words - 1 if end_pos >= total_ts: end_pos = total_ts - 1 end_t = timestamp_list[end_pos][1] # 写入 SRT 格式 # f.write(f"{line_count}\n") # f.write(f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}\n") # f.write(f"{s}\n\n") srt_list.append({ "line": line_count, "time": f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}", "text": s }) # 更新索引 ts_idx += num_words line_count += 1 return srt_list def format_time_srt(ms): """毫秒转 SRT 格式: HH:MM:SS,mmm""" s, ms = divmod(ms, 1000) m, s = divmod(s, 60) h, m = divmod(m, 60) return f"{h:02}:{m:02}:{s:02},{int(ms):03}" def save_srt_file(srt_list, output_path): """ 将 srt 列表写入文件 :param srt_list: 包含 line, time, text 字典的列表 :param output_path: 输出路径 (如 'output.srt') """ with open(output_path, 'w', encoding='utf-8') as f: for entry in srt_list: # 1. 写入序号 (line) f.write(f"{entry['line']}\n") # 2. 写入时间轴 (time) f.write(f"{entry['time']}\n") # 3. 写入文本 (text) f.write(f"{entry['text']}\n") # 4. 写入一个空行作为分隔符 f.write("\n") def parse_srt_to_list(file_path): """ 将 SRT 文件还原为 [{line, time, text}, ...] 结构 """ with open(file_path, 'r', encoding='utf-8') as f: content = f.read().strip() # 正则表达式解释: # (\d+) -> 匹配序号 (line) # (\d{2}:\d{2}:.*) -> 匹配时间轴 (time) # ([\s\S]*?) -> 匹配文本内容 (text),支持多行 # (?=\n\d+\n|\Z) -> 断言后面紧跟下一个序号或文件末尾 pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\d+\n|\Z)') matches = pattern.findall(content) srt_list = [] for m in matches: srt_list.append({ "line": int(m[0]), "time": m[1], "text": m[2].strip() # 去掉文本末尾可能的换行 }) return srt_list def analyze_audio_energy(audio_path, segment_ms=100): """ 按时间片段分析音频能量,帮助确定静音阈值 :param audio_path: 音频文件路径 :param segment_ms: 检查的时间块大小(毫秒) """ # 1. 加载音频 sr = 16000 y, _ = librosa.load(audio_path, sr=sr) # 2. 计算每个片段的能量 (RMS) hop_length = int(sr * segment_ms / 1000) energy_list = [] logger.info(f"{'时间 (秒)':<10} | {'能量值 (RMS)':<15} | {'状态估计'}") logger.info("-" * 45) for i in range(0, len(y), hop_length): segment = y[i: i + hop_length] if len(segment) == 0: break rms = np.sqrt(np.mean(segment ** 2)) energy_list.append(rms) # 打印进度和数值 time_sec = i / sr status = "🤫 静音" if rms < 0.005 else "🗣️ 有声" logger.info(f"{time_sec:>8.2f}s | {rms:>15.6f} | {status}") # 3. 输出统计建议 logger.info("-" * 45) logger.info(f"最大能量: {max(energy_list):.6f}") logger.info(f"最小能量: {min(energy_list):.6f}") logger.info(f"建议阈值: {np.percentile(energy_list, 20):.6f} (取前20%分位数作为参考)")