|
@@ -0,0 +1,533 @@
|
|
|
|
|
+import json
|
|
|
|
|
+import os
|
|
|
|
|
+import re
|
|
|
|
|
+import hashlib
|
|
|
|
|
+import logging
|
|
|
|
|
+import shutil
|
|
|
|
|
+import subprocess
|
|
|
|
|
+from pathlib import Path
|
|
|
|
|
+import cv2
|
|
|
|
|
+import librosa
|
|
|
|
|
+import numpy as np
|
|
|
|
|
+
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
+
|
|
|
|
|
+def get_video_info(video_path):
|
|
|
|
|
+ """获取视频基础元数据"""
|
|
|
|
|
+ cap = cv2.VideoCapture(video_path)
|
|
|
|
|
+ if not cap.isOpened():
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
|
|
|
|
|
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
|
|
|
|
|
+ fps = cap.get(cv2.CAP_PROP_FPS)
|
|
|
|
|
+ frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
|
|
|
|
|
+ duration = int(frame_count / fps) if fps > 0 else 0
|
|
|
|
|
+ size_byte = int(os.path.getsize(video_path))
|
|
|
|
|
+
|
|
|
|
|
+ # 判定横竖屏:1 为横屏, 0 为竖屏
|
|
|
|
|
+ horizontal = 1 if width >= height else 0
|
|
|
|
|
+
|
|
|
|
|
+ cap.release()
|
|
|
|
|
+ return {
|
|
|
|
|
+ "video_path": video_path,
|
|
|
|
|
+ "duration": duration,
|
|
|
|
|
+ "size_byte": size_byte,
|
|
|
|
|
+ "width": width,
|
|
|
|
|
+ "height": height,
|
|
|
|
|
+ "horizontal": horizontal,
|
|
|
|
|
+ "scenes": []
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_media_info(media_path):
|
|
|
|
|
+ """获取视频基础元数据(含音频)"""
|
|
|
|
|
+ # 1. 使用 ffprobe 获取详细流信息
|
|
|
|
|
+ cmd = [
|
|
|
|
|
+ 'ffprobe', '-v', 'quiet', '-print_format', 'json',
|
|
|
|
|
+ '-show_streams', '-show_format', media_path
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ result = subprocess.check_output(cmd, encoding='utf-8')
|
|
|
|
|
+ data = json.loads(result)
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ print(f"ffprobe 解析失败: {e}")
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+ # 提取视频流和音频流
|
|
|
|
|
+ video_stream = next((s for s in data['streams'] if s['codec_type'] == 'video'), None)
|
|
|
|
|
+ audio_stream = next((s for s in data['streams'] if s['codec_type'] == 'audio'), None)
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 基础视频信息
|
|
|
|
|
+ width = int(video_stream.get('width', 0)) if video_stream else 0
|
|
|
|
|
+ height = int(video_stream.get('height', 0)) if video_stream else 0
|
|
|
|
|
+ duration = float(data['format'].get('duration', 0))
|
|
|
|
|
+ size_byte = int(os.path.getsize(media_path))
|
|
|
|
|
+ horizontal = 1 if width >= height else 0
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 构造返回结构
|
|
|
|
|
+ info = {
|
|
|
|
|
+ "media_path": media_path,
|
|
|
|
|
+ "duration": round(duration, 2),
|
|
|
|
|
+ "size_byte": size_byte,
|
|
|
|
|
+ "width": width,
|
|
|
|
|
+ "height": height,
|
|
|
|
|
+ "horizontal": horizontal,
|
|
|
|
|
+ # 新增音频字段
|
|
|
|
|
+ "has_audio": audio_stream is not None,
|
|
|
|
|
+ "audio_info": {
|
|
|
|
|
+ "codec": audio_stream.get('codec_name'),
|
|
|
|
|
+ "sample_rate": audio_stream.get('sample_rate'),
|
|
|
|
|
+ "channels": audio_stream.get('channels'),
|
|
|
|
|
+ "bit_rate": audio_stream.get('bit_rate')
|
|
|
|
|
+ } if audio_stream else None,
|
|
|
|
|
+ "scenes": []
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return info
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_scene_times(video_path, threshold=0.3):
|
|
|
|
|
+ cmd = [
|
|
|
|
|
+ 'ffmpeg',
|
|
|
|
|
+ '-hide_banner',
|
|
|
|
|
+ '-i', video_path,
|
|
|
|
|
+ '-threads', '0',
|
|
|
|
|
+ '-vf', f"select='eq(n,0)+gt(scene,{threshold})',showinfo",
|
|
|
|
|
+ '-vsync', 'vfr',
|
|
|
|
|
+ '-f', 'null', '-' # 仅测试检测,不实际写文件;如需写文件请换回你的参数
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ scene_start_times = []
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 2. 启动子进程
|
|
|
|
|
+ # stderr=subprocess.PIPE 捕获日志,stdout=subprocess.DEVNULL 忽略正常输出
|
|
|
|
|
+ process = subprocess.Popen(
|
|
|
|
|
+ cmd,
|
|
|
|
|
+ stdout=subprocess.DEVNULL,
|
|
|
|
|
+ stderr=subprocess.PIPE,
|
|
|
|
|
+ universal_newlines=True,
|
|
|
|
|
+ encoding='utf-8'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 实时解析日志
|
|
|
|
|
+ # 使用 stdout/stderr 迭代时,建议处理编码或可能的读取中断
|
|
|
|
|
+ try:
|
|
|
|
|
+ # showinfo 的输出在 stderr
|
|
|
|
|
+ for line in process.stderr:
|
|
|
|
|
+ if "pts_time:" in line:
|
|
|
|
|
+ match = re.search(r"pts_time:(\d+\.\d+)", line)
|
|
|
|
|
+ if match:
|
|
|
|
|
+ time_val = float(match.group(1))
|
|
|
|
|
+ scene_start_times.append(time_val)
|
|
|
|
|
+ logger.info(f"检测到新场景起始点: {time_val}s")
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ process.kill() # 如果读取过程崩溃,强制结束进程
|
|
|
|
|
+ raise RuntimeError(f"读取 FFmpeg 输出时发生错误: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ # 4. 等待进程结束并检查退出码
|
|
|
|
|
+ process.wait()
|
|
|
|
|
+
|
|
|
|
|
+ if process.returncode != 0:
|
|
|
|
|
+ # 获取最后几行错误信息(如果有)
|
|
|
|
|
+ raise subprocess.CalledProcessError(process.returncode, cmd)
|
|
|
|
|
+
|
|
|
|
|
+ except FileNotFoundError:
|
|
|
|
|
+ # 当系统环境变量里找不到 'ffmpeg' 命令时触发
|
|
|
|
|
+ raise RuntimeError("系统未安装 FFmpeg 或未将其添加到环境变量 PATH 中")
|
|
|
|
|
+
|
|
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
|
|
+ # FFmpeg 执行过程中报错(如视频解码失败、参数错误)
|
|
|
|
|
+ raise RuntimeError(f"FFmpeg 处理视频失败,退出码: {e.returncode}")
|
|
|
|
|
+
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ # 其他未知异常
|
|
|
|
|
+ raise RuntimeError(f"发生未知错误: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ if not scene_start_times:
|
|
|
|
|
+ return [0.0]
|
|
|
|
|
+
|
|
|
|
|
+ if scene_start_times[0] > 0.5:
|
|
|
|
|
+ # 手动把第一个点修正为 0.0
|
|
|
|
|
+ scene_start_times.insert(0, 0.0)
|
|
|
|
|
+
|
|
|
|
|
+ return scene_start_times
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def split_video_by_scenes(video_path, scene_start_times, output_dir="segment"):
|
|
|
|
|
+ """
|
|
|
|
|
+ 根据给定的起始时间列表分割视频
|
|
|
|
|
+ """
|
|
|
|
|
+ if not scene_start_times:
|
|
|
|
|
+ logger.info("没有检测到场景,跳过分割。")
|
|
|
|
|
+ return
|
|
|
|
|
+
|
|
|
|
|
+ # 添加一个结束标识,方便循环计算时长
|
|
|
|
|
+ # 这里不需要准确的视频总长,FFmpeg 处理最后一个片段时会自动截取到末尾
|
|
|
|
|
+ times = scene_start_times + [None]
|
|
|
|
|
+ for i in range(len(times) - 1):
|
|
|
|
|
+ start_time = times[i]
|
|
|
|
|
+ next_time = times[i + 1]
|
|
|
|
|
+
|
|
|
|
|
+ output_file = f"{output_dir}/segment_{i:03d}.mp4"
|
|
|
|
|
+
|
|
|
|
|
+ # 构建命令
|
|
|
|
|
+ # -ss 放在 -i 前面可以实现快速定位(基于关键帧)
|
|
|
|
|
+ cmd = [
|
|
|
|
|
+ 'ffmpeg', '-hide_banner', '-y',
|
|
|
|
|
+ '-ss', str(start_time),
|
|
|
|
|
+ '-i', video_path
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ # 如果不是最后一个片段,指定持续时间 -t
|
|
|
|
|
+ if next_time is not None:
|
|
|
|
|
+ duration = next_time - start_time
|
|
|
|
|
+ cmd.extend(['-t', str(duration)])
|
|
|
|
|
+
|
|
|
|
|
+ # 使用 copy 模式不重编码,速度极快
|
|
|
|
|
+ cmd.extend(['-c', 'copy', output_file])
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ subprocess.run(cmd, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.PIPE)
|
|
|
|
|
+ logger.info(f"完成: {output_file} (起始点: {start_time}s)")
|
|
|
|
|
+ except subprocess.CalledProcessError as e:
|
|
|
|
|
+ logger.info(f"分割片段 {i} 失败: {e.stderr.decode()}")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def calculate_mid_points(video_path, scene_start_times):
|
|
|
|
|
+ """
|
|
|
|
|
+ 计算每个场景的详细信息:
|
|
|
|
|
+ 1. 获取视频总时长以确定最后一个场景的边界。
|
|
|
|
|
+ 2. 如果场景时长 < 0.5s,抽帧点取起始点;否则取中点。
|
|
|
|
|
+ 3. 返回格式化的字典列表。
|
|
|
|
|
+ """
|
|
|
|
|
+ # 1. 获取视频总时长
|
|
|
|
|
+ duration_cmd = [
|
|
|
|
|
+ 'ffprobe', '-v', 'error', '-show_entries', 'format=duration',
|
|
|
|
|
+ '-of', 'default=noprint_wrappers=1:nokey=1', video_path
|
|
|
|
|
+ ]
|
|
|
|
|
+ try:
|
|
|
|
|
+ total_duration = float(subprocess.check_output(duration_cmd).decode().strip())
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ # 如果获取时长失败,可以根据需求抛出异常或设置一个保守值
|
|
|
|
|
+ raise RuntimeError(f"无法获取视频时长: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 构建结束时间点(下一个场景的开始即当前场景的结束)
|
|
|
|
|
+ end_times = scene_start_times[1:] + [total_duration]
|
|
|
|
|
+
|
|
|
|
|
+ scenes = []
|
|
|
|
|
+ for start, end in zip(scene_start_times, end_times):
|
|
|
|
|
+ duration = end - start
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 计算抽帧位置逻辑
|
|
|
|
|
+ if duration < 0.5:
|
|
|
|
|
+ mid_point = start
|
|
|
|
|
+ else:
|
|
|
|
|
+ mid_point = (start + end) / 2
|
|
|
|
|
+
|
|
|
|
|
+ # 4. 组装成指定的字典格式
|
|
|
|
|
+ scenes.append({
|
|
|
|
|
+ "scene_start": round(start, 3),
|
|
|
|
|
+ "scene_end": round(end, 3),
|
|
|
|
|
+ "frame_pos": round(mid_point, 3)
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ return scenes
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def extract_frames(video_path, scenes, output_dir="thumbnails"):
|
|
|
|
|
+ """
|
|
|
|
|
+ 执行实际的 FFmpeg 抽帧操作。
|
|
|
|
|
+ 输入: scenes 字典列表。
|
|
|
|
|
+ 输出: 带有 'frame_path' 绝对路径的 scenes 字典列表。
|
|
|
|
|
+ """
|
|
|
|
|
+ if not os.path.exists(output_dir):
|
|
|
|
|
+ os.makedirs(output_dir)
|
|
|
|
|
+
|
|
|
|
|
+ # 获取输出目录的绝对路径,确保返回的路径是完整的
|
|
|
|
|
+ abs_output_dir = os.path.abspath(output_dir)
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"开始执行抽帧任务,目标数量: {len(scenes)}")
|
|
|
|
|
+
|
|
|
|
|
+ for i, scene in enumerate(scenes):
|
|
|
|
|
+ ts = scene["frame_pos"]
|
|
|
|
|
+ # 文件命名保持之前的规范:序号_时间戳.jpg
|
|
|
|
|
+ file_name = f"scene_{i + 1:03d}_{ts}s.jpg"
|
|
|
|
|
+ output_file_path = os.path.join(abs_output_dir, file_name)
|
|
|
|
|
+
|
|
|
|
|
+ # 使用快速定位 (-ss 在 -i 前)
|
|
|
|
|
+ cmd = [
|
|
|
|
|
+ 'ffmpeg', '-hide_banner', '-loglevel', 'error',
|
|
|
|
|
+ '-ss', str(ts),
|
|
|
|
|
+ '-i', video_path,
|
|
|
|
|
+ '-frames:v', '1',
|
|
|
|
|
+ '-q:v', '2',
|
|
|
|
|
+ '-vf', 'scale=640:-1', # 预览图建议缩放,速度更快
|
|
|
|
|
+ output_file_path, '-y'
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ subprocess.run(cmd, check=True)
|
|
|
|
|
+ # 抽帧成功后,将绝对路径存入字典
|
|
|
|
|
+ scene["frame_path"] = output_file_path
|
|
|
|
|
+
|
|
|
|
|
+ if (i + 1) % 5 == 0 or (i + 1) == len(scenes):
|
|
|
|
|
+ logger.info(f"进度: {i + 1}/{len(scenes)}")
|
|
|
|
|
+ except subprocess.CalledProcessError:
|
|
|
|
|
+ logger.info(f"错误: 无法提取 {ts}s 处的帧")
|
|
|
|
|
+ scene["frame_path"] = None # 如果提取失败,可以标记为 None
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"任务完成,存储路径: {abs_output_dir}")
|
|
|
|
|
+ return scenes
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+# subtitles 滤镜位于 filter_complex 的字符串内部,FFmpeg 会对其进行二次解析。如果路径包含 \、: 或空格,解析就会崩溃
|
|
|
|
|
+# 将 srt 文件临时改名为一个完全合法的名字并复制文件, 处理完成后再删除复制的文件
|
|
|
|
|
+def get_safe_temp_srt(srt_path):
|
|
|
|
|
+ """
|
|
|
|
|
+ 根据原始路径生成一个位于同目录下的 SHA256 临时文件名
|
|
|
|
|
+ """
|
|
|
|
|
+ srt_obj = Path(srt_path).resolve()
|
|
|
|
|
+ # 计算路径或内容的 hash (建议计算路径的 hash 即可,速度快)
|
|
|
|
|
+ path_hash = hashlib.sha256(str(srt_obj).encode('utf-8')).hexdigest()
|
|
|
|
|
+
|
|
|
|
|
+ # 构造临时文件路径:与原文件同目录,名字为 hash.srt
|
|
|
|
|
+ temp_srt_path = srt_obj.parent / f"{path_hash}.srt"
|
|
|
|
|
+ return temp_srt_path
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def generate_video(audio_path, srt_path, video_output):
|
|
|
|
|
+ # 预处理 srt 路径
|
|
|
|
|
+ # 1. 转为绝对路径
|
|
|
|
|
+ # 2. 统一使用正斜杠 /
|
|
|
|
|
+ # 3. 处理 subtitles 滤镜特有的转义:将 ':' 替换为 '\:'
|
|
|
|
|
+ temp_srt_path = get_safe_temp_srt(srt_path)
|
|
|
|
|
+ shutil.copy(srt_path, temp_srt_path)
|
|
|
|
|
+
|
|
|
|
|
+ font_name = 'WenQuanYi Micro Hei'
|
|
|
|
|
+ font_size = 20
|
|
|
|
|
+ font_color = '&H0000FFFF&'
|
|
|
|
|
+
|
|
|
|
|
+ """调用 FFmpeg 合成视频"""
|
|
|
|
|
+ # 建议设置:
|
|
|
|
|
+ # -rc vbr: 使用可变码率模式
|
|
|
|
|
+ # -cq 28: 控制质量。数值越大,体积越小。推荐范围 24-32
|
|
|
|
|
+ # -b:v 0: 在 cq 模式下,将目标码率设为 0,让编码器完全根据质量控制
|
|
|
|
|
+ command = [
|
|
|
|
|
+ 'ffmpeg', '-y',
|
|
|
|
|
+ '-hide_banner',
|
|
|
|
|
+ '-i', audio_path,
|
|
|
|
|
+ '-filter_complex',
|
|
|
|
|
+ f"[0:a]showwaves=s=854x480:mode=line:colors=0x00FFFF[v];"
|
|
|
|
|
+ f"[v]subtitles={temp_srt_path}:charenc=UTF-8:force_style='FontName={font_name},FontSize={font_size},PrimaryColour={font_color},Alignment=2'[v_out]",
|
|
|
|
|
+ '-map', '[v_out]',
|
|
|
|
|
+ '-map', '0:a',
|
|
|
|
|
+ '-c:v', 'libx264', # 使用 CPU 编码压缩率更高
|
|
|
|
|
+ '-preset', 'veryfast', # 编码速度预设。想要体积更小可以改为 'medium',但速度会慢一点
|
|
|
|
|
+ '-crf', '28', # 质量控制:23 是默认,28 体积更小,对于 480p 波形图完全够用
|
|
|
|
|
+ '-pix_fmt', 'yuv420p', # 增强兼容性,确保所有播放器都能看
|
|
|
|
|
+ '-c:a', 'aac',
|
|
|
|
|
+ '-b:a', '128k', # 音频码率限制在 128k
|
|
|
|
|
+ '-shortest',
|
|
|
|
|
+ video_output
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ try:
|
|
|
|
|
+ # 使用 Popen 启动进程,将 stderr 重定向到 PIPE
|
|
|
|
|
+ # 注意:FFmpeg 的进度信息是在 stderr 输出的
|
|
|
|
|
+ process = subprocess.Popen(
|
|
|
|
|
+ command,
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.STDOUT, # 将 stderr 合并到 stdout 统一处理
|
|
|
|
|
+ text=True,
|
|
|
|
|
+ encoding='utf-8',
|
|
|
|
|
+ errors='replace'
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ # 用于记录最后几行日志,方便报错时排查
|
|
|
|
|
+ error_log_buffer = []
|
|
|
|
|
+ logger.info(f"🎬 开始合成视频: {video_output}")
|
|
|
|
|
+ # 实时读取并打印输出内容
|
|
|
|
|
+ while True:
|
|
|
|
|
+ line = process.stdout.readline()
|
|
|
|
|
+ if not line and process.poll() is not None:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if line:
|
|
|
|
|
+ clean_line = line.strip()
|
|
|
|
|
+ error_log_buffer.append(clean_line)
|
|
|
|
|
+ # 保持缓冲区大小,只留最后 20 行
|
|
|
|
|
+ if len(error_log_buffer) > 20:
|
|
|
|
|
+ error_log_buffer.pop(0)
|
|
|
|
|
+
|
|
|
|
|
+ # 如果是进度行,则在同一行刷新;如果是警告/错误,则换行打印
|
|
|
|
|
+ if "frame=" in clean_line or "size=" in clean_line:
|
|
|
|
|
+ # 这里的 line 会包含诸如 "frame= 123 fps= 30 size= 512kB time=00:00:05.12..." 的进度信息
|
|
|
|
|
+ # 使用 end='' 是因为 readline 自带换行符
|
|
|
|
|
+ logger.info(f"\rFFmpeg 进度: {clean_line}")
|
|
|
|
|
+
|
|
|
|
|
+ # 检查最终退出状态
|
|
|
|
|
+ process.wait()
|
|
|
|
|
+
|
|
|
|
|
+ if process.returncode != 0:
|
|
|
|
|
+ # 拼接最后的错误片段
|
|
|
|
|
+ last_errors = "\n".join(error_log_buffer)
|
|
|
|
|
+ raise RuntimeError(
|
|
|
|
|
+ f"FFmpeg 执行失败 (退出码 {process.returncode})\n"
|
|
|
|
|
+ f"--- 最后 20 行日志 ---\n{last_errors}"
|
|
|
|
|
+ )
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"\n🚀 视频合成成功: {video_output}")
|
|
|
|
|
+ finally:
|
|
|
|
|
+ if temp_srt_path.exists():
|
|
|
|
|
+ temp_srt_path.unlink()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def get_precise_srt(text_list, timestamp_list, max_chars=20):
|
|
|
|
|
+ total_ts = len(timestamp_list)
|
|
|
|
|
+ raw_parts = text_list
|
|
|
|
|
+ sentences = []
|
|
|
|
|
+ # 合并标点到前面的短句
|
|
|
|
|
+ for i in range(0, len(raw_parts) - 1, 2):
|
|
|
|
|
+ sentences.append(raw_parts[i] + raw_parts[i + 1])
|
|
|
|
|
+ if len(raw_parts) % 2 == 1:
|
|
|
|
|
+ sentences.append(raw_parts[-1])
|
|
|
|
|
+
|
|
|
|
|
+ ts_idx = 0
|
|
|
|
|
+ line_count = 1
|
|
|
|
|
+
|
|
|
|
|
+ srt_list = []
|
|
|
|
|
+ for sentence in sentences:
|
|
|
|
|
+ sentence = sentence.strip()
|
|
|
|
|
+ if not sentence or ts_idx >= total_ts:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 如果单句太长,进行硬切分(按 max_chars)
|
|
|
|
|
+ sub_sentences = [sentence[i:i + max_chars] for i in range(0, len(sentence), max_chars)]
|
|
|
|
|
+
|
|
|
|
|
+ for s in sub_sentences:
|
|
|
|
|
+ # 统计这行里有多少个字符是对应时间戳的
|
|
|
|
|
+ # 注意:Paraformer 的时间戳通常不包含标点,需要过滤掉标点再计数
|
|
|
|
|
+ pure_words = re.sub(r'[^\w\u4e00-\u9fa5]', '', s) # 仅保留中文字符和字母数字
|
|
|
|
|
+ num_words = len(pure_words)
|
|
|
|
|
+
|
|
|
|
|
+ if num_words == 0:
|
|
|
|
|
+ continue
|
|
|
|
|
+
|
|
|
|
|
+ # --- 关键防护:检查 ts_idx 是否越界 ---
|
|
|
|
|
+ if ts_idx >= total_ts:
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # 获取开始时间
|
|
|
|
|
+ start_t = timestamp_list[ts_idx][0]
|
|
|
|
|
+
|
|
|
|
|
+ # 计算结束索引,确保不越界
|
|
|
|
|
+ end_pos = ts_idx + num_words - 1
|
|
|
|
|
+ if end_pos >= total_ts:
|
|
|
|
|
+ end_pos = total_ts - 1
|
|
|
|
|
+
|
|
|
|
|
+ end_t = timestamp_list[end_pos][1]
|
|
|
|
|
+
|
|
|
|
|
+ # 写入 SRT 格式
|
|
|
|
|
+ # f.write(f"{line_count}\n")
|
|
|
|
|
+ # f.write(f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}\n")
|
|
|
|
|
+ # f.write(f"{s}\n\n")
|
|
|
|
|
+ srt_list.append({
|
|
|
|
|
+ "line": line_count,
|
|
|
|
|
+ "time": f"{format_time_srt(start_t)} --> {format_time_srt(end_t)}",
|
|
|
|
|
+ "text": s
|
|
|
|
|
+ })
|
|
|
|
|
+
|
|
|
|
|
+ # 更新索引
|
|
|
|
|
+ ts_idx += num_words
|
|
|
|
|
+ line_count += 1
|
|
|
|
|
+ return srt_list
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def format_time_srt(ms):
|
|
|
|
|
+ """毫秒转 SRT 格式: HH:MM:SS,mmm"""
|
|
|
|
|
+ s, ms = divmod(ms, 1000)
|
|
|
|
|
+ m, s = divmod(s, 60)
|
|
|
|
|
+ h, m = divmod(m, 60)
|
|
|
|
|
+ return f"{h:02}:{m:02}:{s:02},{int(ms):03}"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def save_srt_file(srt_list, output_path):
|
|
|
|
|
+ """
|
|
|
|
|
+ 将 srt 列表写入文件
|
|
|
|
|
+ :param srt_list: 包含 line, time, text 字典的列表
|
|
|
|
|
+ :param output_path: 输出路径 (如 'output.srt')
|
|
|
|
|
+ """
|
|
|
|
|
+ with open(output_path, 'w', encoding='utf-8') as f:
|
|
|
|
|
+ for entry in srt_list:
|
|
|
|
|
+ # 1. 写入序号 (line)
|
|
|
|
|
+ f.write(f"{entry['line']}\n")
|
|
|
|
|
+ # 2. 写入时间轴 (time)
|
|
|
|
|
+ f.write(f"{entry['time']}\n")
|
|
|
|
|
+ # 3. 写入文本 (text)
|
|
|
|
|
+ f.write(f"{entry['text']}\n")
|
|
|
|
|
+ # 4. 写入一个空行作为分隔符
|
|
|
|
|
+ f.write("\n")
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def parse_srt_to_list(file_path):
|
|
|
|
|
+ """
|
|
|
|
|
+ 将 SRT 文件还原为 [{line, time, text}, ...] 结构
|
|
|
|
|
+ """
|
|
|
|
|
+ with open(file_path, 'r', encoding='utf-8') as f:
|
|
|
|
|
+ content = f.read().strip()
|
|
|
|
|
+
|
|
|
|
|
+ # 正则表达式解释:
|
|
|
|
|
+ # (\d+) -> 匹配序号 (line)
|
|
|
|
|
+ # (\d{2}:\d{2}:.*) -> 匹配时间轴 (time)
|
|
|
|
|
+ # ([\s\S]*?) -> 匹配文本内容 (text),支持多行
|
|
|
|
|
+ # (?=\n\d+\n|\Z) -> 断言后面紧跟下一个序号或文件末尾
|
|
|
|
|
+ pattern = re.compile(r'(\d+)\n(\d{2}:\d{2}:\d{2},\d{3} --> \d{2}:\d{2}:\d{2},\d{3})\n([\s\S]*?)(?=\n\d+\n|\Z)')
|
|
|
|
|
+
|
|
|
|
|
+ matches = pattern.findall(content)
|
|
|
|
|
+
|
|
|
|
|
+ srt_list = []
|
|
|
|
|
+ for m in matches:
|
|
|
|
|
+ srt_list.append({
|
|
|
|
|
+ "line": int(m[0]),
|
|
|
|
|
+ "time": m[1],
|
|
|
|
|
+ "text": m[2].strip() # 去掉文本末尾可能的换行
|
|
|
|
|
+ })
|
|
|
|
|
+ return srt_list
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def analyze_audio_energy(audio_path, segment_ms=100):
|
|
|
|
|
+ """
|
|
|
|
|
+ 按时间片段分析音频能量,帮助确定静音阈值
|
|
|
|
|
+ :param audio_path: 音频文件路径
|
|
|
|
|
+ :param segment_ms: 检查的时间块大小(毫秒)
|
|
|
|
|
+ """
|
|
|
|
|
+ # 1. 加载音频
|
|
|
|
|
+ sr = 16000
|
|
|
|
|
+ y, _ = librosa.load(audio_path, sr=sr)
|
|
|
|
|
+
|
|
|
|
|
+ # 2. 计算每个片段的能量 (RMS)
|
|
|
|
|
+ hop_length = int(sr * segment_ms / 1000)
|
|
|
|
|
+ energy_list = []
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"{'时间 (秒)':<10} | {'能量值 (RMS)':<15} | {'状态估计'}")
|
|
|
|
|
+ logger.info("-" * 45)
|
|
|
|
|
+
|
|
|
|
|
+ for i in range(0, len(y), hop_length):
|
|
|
|
|
+ segment = y[i: i + hop_length]
|
|
|
|
|
+ if len(segment) == 0: break
|
|
|
|
|
+
|
|
|
|
|
+ rms = np.sqrt(np.mean(segment ** 2))
|
|
|
|
|
+ energy_list.append(rms)
|
|
|
|
|
+
|
|
|
|
|
+ # 打印进度和数值
|
|
|
|
|
+ time_sec = i / sr
|
|
|
|
|
+ status = "🤫 静音" if rms < 0.005 else "🗣️ 有声"
|
|
|
|
|
+ logger.info(f"{time_sec:>8.2f}s | {rms:>15.6f} | {status}")
|
|
|
|
|
+
|
|
|
|
|
+ # 3. 输出统计建议
|
|
|
|
|
+ logger.info("-" * 45)
|
|
|
|
|
+ logger.info(f"最大能量: {max(energy_list):.6f}")
|
|
|
|
|
+ logger.info(f"最小能量: {min(energy_list):.6f}")
|
|
|
|
|
+ logger.info(f"建议阈值: {np.percentile(energy_list, 20):.6f} (取前20%分位数作为参考)")
|