from fastapi import APIRouter, HTTPException from fastapi.responses import FileResponse import os from setting import UPLOAD_DIR, OUTPUT_DIR router = APIRouter(prefix="/api1/file", tags=["file"]) @router.get("/image/{filename}") async def get_image(filename: str): # 1. 构建完整路径 file_path = os.path.join(UPLOAD_DIR, filename) return get_file(file_path) # # # 2. 安全检查:防止目录穿越漏洞 (Directory Traversal) # # 确保用户请求的文件确实在 UPLOAD_DIR 目录下 # real_path = os.path.realpath(file_path) # if not real_path.startswith(os.path.realpath(UPLOAD_DIR)): # raise HTTPException(status_code=403, detail="拒绝访问该路径") # # # 3. 检查文件是否存在 # if not os.path.exists(real_path): # raise HTTPException(status_code=404, detail="图片不存在") # # # 4. 返回文件流 # # media_type 会根据后缀自动识别(如 image/jpeg),也可以手动指定 # return FileResponse(real_path) @router.get("/audio/{filename}") async def get_audio(filename: str): # 1. 构建完整路径 file_path = os.path.join(UPLOAD_DIR, filename) return get_file(file_path) @router.get("/video/{filename}") async def get_video(filename: str): # 1. 构建完整路径 file_path = os.path.join(OUTPUT_DIR, filename) return get_file(file_path) def get_file(file_path): # 2. 安全检查:防止目录穿越漏洞 (Directory Traversal) # 确保用户请求的文件确实在 UPLOAD_DIR 目录下 real_path = os.path.realpath(file_path) if not (real_path.startswith(os.path.realpath(UPLOAD_DIR)) or real_path.startswith(os.path.realpath(OUTPUT_DIR))): raise HTTPException(status_code=403, detail="拒绝访问该路径") # 3. 检查文件是否存在 if not os.path.exists(real_path): raise HTTPException(status_code=404, detail="视频不存在") # 4. 返回文件流 # media_type 会根据后缀自动识别(如 image/jpeg),也可以手动指定 return FileResponse(real_path)