Преглед на файлове

1.oss-mgr 添加 UploadTaskController
2.oss-sdk 添加 OssClient#setObjectActive

reghao преди 2 седмици
родител
ревизия
9212074bf5

+ 8 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/controller/OssSdkController.java

@@ -89,6 +89,14 @@ public class OssSdkController {
         return WebResult.success(objectChannel);
     }
 
+    @Operation(summary = "激活上传的对象", description = "N")
+    @PostMapping(value = "/object/activate", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String setObjectActive(@RequestBody @Validated FileInitRequest req) {
+        String objectId = req.getObjectId();
+        ossClientService.setObjectActive(objectId);
+        return WebResult.success();
+    }
+
     @Operation(summary = "设置对象的可见范围", description = "N")
     @PostMapping(value = "/object/scope", produces = MediaType.APPLICATION_JSON_VALUE)
     public String checkAndSetScope(@RequestBody @Validated FileInitRequest req) {

+ 49 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/controller/UploadTaskController.java

@@ -0,0 +1,49 @@
+package cn.reghao.oss.mgr.controller;
+
+import cn.reghao.jutil.jdk.web.db.PageList;
+import cn.reghao.jutil.jdk.web.result.WebResult;
+import cn.reghao.oss.mgr.model.dto.UploadTaskQuery;
+import cn.reghao.oss.mgr.model.vo.UploadTaskInfo;
+import cn.reghao.oss.mgr.service.UploadTaskService;
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.*;
+
+/**
+ * @author reghao
+ * @date 2026-06-01 11:05:30
+ */
+@Slf4j
+@Tag(name = "上传任务接口")
+@RestController
+@RequestMapping("/api/oss/upload_task")
+public class UploadTaskController {
+    private final UploadTaskService uploadTaskService;
+
+    public UploadTaskController(UploadTaskService uploadTaskService) {
+        this.uploadTaskService = uploadTaskService;
+    }
+
+    @Operation(summary = "获取上传任务列表", description = "N")
+    @GetMapping(value = "/list", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getUploadTasks(@RequestParam("pn") Integer pageNumber,
+                        @RequestParam(value = "status", required = false) Integer status) {
+        UploadTaskQuery uploadTaskQuery;
+        if (status == null) {
+            uploadTaskQuery = new UploadTaskQuery.Builder().build();
+        } else {
+            uploadTaskQuery = new UploadTaskQuery.Builder().status(status).build();
+        }
+
+        PageList<UploadTaskInfo> pageList = uploadTaskService.getUploadTasks(pageNumber, uploadTaskQuery);
+        return WebResult.success(pageList);
+    }
+
+    @Operation(summary = "清空已完成的上传任务", description = "N")
+    @PostMapping(value = "/clear", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String clearUploadTask() {
+        return WebResult.success();
+    }
+}

+ 1 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/db/mapper/FileMetaMapper.java

@@ -31,4 +31,5 @@ public interface FileMetaMapper extends BaseMapper<FileMeta> {
     ObjectInfo findObjectInfoById(@Param("objectId") String objectId);
     List<FileMeta> findBySha256List(List<String> list);
     List<FileMeta> findAllByDeleted();
+    List<FileMeta> findAllByInactive();
 }

+ 7 - 1
oss-mgr/src/main/java/cn/reghao/oss/mgr/db/mapper/UploadTaskMapper.java

@@ -1,6 +1,8 @@
 package cn.reghao.oss.mgr.db.mapper;
 
 import cn.reghao.jutil.jdk.web.db.BaseMapper;
+import cn.reghao.jutil.jdk.web.db.Page;
+import cn.reghao.oss.mgr.model.dto.UploadTaskQuery;
 import cn.reghao.oss.mgr.model.po.UploadTask;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
@@ -13,13 +15,17 @@ import java.util.List;
  */
 @Mapper
 public interface UploadTaskMapper extends BaseMapper<UploadTask> {
+    void deleteByUploadId(String uploadId);
+
     void updateTask(@Param("uploadId") String uploadId,
                     @Param("status") int status,
                     @Param("host") String host,
                     @Param("httpPort") int httpPort);
     void updateStatus(@Param("uploadId") String uploadId, @Param("status") int status);
-    void deleteByUploadId(String uploadId);
 
     UploadTask findByUploadId(String uploadId);
     List<UploadTask> findByExpired();
+
+    int countByQuery(UploadTaskQuery uploadTaskQuery);
+    List<UploadTask> findQueryByPage(@Param("page") Page page, @Param("uploadTaskQuery") UploadTaskQuery uploadTaskQuery);
 }

+ 18 - 7
oss-mgr/src/main/java/cn/reghao/oss/mgr/db/repository/ObjectRepository.java

@@ -58,12 +58,6 @@ public class ObjectRepository {
         fileMetaMapper.save(fileMeta);
     }
 
-    @Transactional(rollbackFor = Exception.class)
-    public void saveObject(FileMeta fileMeta, List<DataBlock> list) {
-        fileMetaMapper.save(fileMeta);
-        dataBlockMapper.saveAll(list);
-    }
-
     @Transactional(rollbackFor = Exception.class)
     public void saveFastUpload(FileMeta fileMeta, UploadTask uploadTask) {
         fileMetaMapper.save(fileMeta);
@@ -76,10 +70,27 @@ public class ObjectRepository {
         log.info("evict {}", objectName);
     }
 
-    public void deleteObject(String objectId) {
+    /**
+     * 逻辑删除
+     *
+     * @param
+     * @return
+     * @date 2026-06-01 14:00:48
+     */
+    public void deleteObjectLogical(String objectId) {
         fileMetaMapper.updateSetDeleteByObjectId(objectId);
     }
 
+    /**
+     * 物理删除
+     *
+     * @param
+     * @return
+     * @date 2026-06-01 14:00:58
+     */
+    public void deleteObjectPhysical(String objectId) {
+    }
+
     public DataBlock getBySha256sum(String sha256sum) {
         DataBlock dataBlock = dataBlockMapper.findBySha256sum(sha256sum);
         return dataBlock;

+ 32 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/model/dto/UploadTaskQuery.java

@@ -0,0 +1,32 @@
+package cn.reghao.oss.mgr.model.dto;
+
+/**
+ * @author reghao
+ * @date 2026-06-01 11:37:11
+ */
+public class UploadTaskQuery {
+    private Integer status;
+
+    public UploadTaskQuery() {
+    }
+
+    private UploadTaskQuery(Builder builder) {
+        this.status = builder.status;
+    }
+
+    public static final class Builder {
+        private Integer status;
+
+        public Builder() {
+        }
+
+        public Builder status(int status) {
+            this.status = status;
+            return this;
+        }
+
+        public UploadTaskQuery build() {
+            return new UploadTaskQuery(this);
+        }
+    }
+}

+ 36 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/model/vo/UploadTaskInfo.java

@@ -0,0 +1,36 @@
+package cn.reghao.oss.mgr.model.vo;
+
+import cn.reghao.jutil.jdk.converter.DateTimeConverter;
+import cn.reghao.oss.mgr.model.po.UploadTask;
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+/**
+ * @author reghao
+ * @date 2026-06-01 11:54:12
+ */
+@Data
+public class UploadTaskInfo {
+    private String uploadId;
+    private String sha256sum;
+    private String filename;
+    private String size;
+    private String status;
+    private String expireTime;
+    private Integer channelCode;
+    private Long uploadBy;
+    private String hostPort;
+
+    public UploadTaskInfo(UploadTask uploadTask, String sizeStr, String statusStr) {
+        this.uploadId = uploadTask.getUploadId();
+        this.sha256sum = uploadTask.getSha256sum();
+        this.filename = uploadTask.getFilename();
+        this.size = sizeStr;
+        this.status = statusStr;
+        this.expireTime = DateTimeConverter.format(uploadTask.getExpireTime());
+        this.channelCode = uploadTask.getChannelCode();
+        this.uploadBy = uploadTask.getUploadBy();
+        this.hostPort = String.format("%s:%s", uploadTask.getHost(), uploadTask.getHttpPort());
+    }
+}

+ 1 - 1
oss-mgr/src/main/java/cn/reghao/oss/mgr/rpc/ConsoleServiceImpl.java

@@ -105,7 +105,7 @@ public class ConsoleServiceImpl implements ConsoleService {
         String sha256sum = uploadDoneResult.getSha256sum();
 
         String objectId = uploadDoneResult.getObjectId();
-        FileMeta fileMeta = fileMetaMapper.findByObjectId(objectId);
+        FileMeta fileMeta = objectRepository.getFileMetaById(objectId);
         if (fileMeta == null) {
             return;
         }

+ 8 - 6
oss-mgr/src/main/java/cn/reghao/oss/mgr/service/OssClientService.java

@@ -63,6 +63,10 @@ public class OssClientService {
         this.userKeyService = userKeyService;
     }
 
+    public void setObjectActive(String objectId) {
+        fileMetaMapper.updateSetActiveByObjectId(objectId);
+    }
+
     public Result getSignedUrl(String objectId, String action, String objectUrl) {
         if (objectUrl != null && !objectUrl.isBlank()) {
             return getSignedUrl0(objectId, objectUrl);
@@ -237,7 +241,7 @@ public class OssClientService {
             return;
         }
 
-        objectRepository.deleteObject(objectId);
+        objectRepository.deleteObjectLogical(objectId);
         /*DataBlock dataBlock = dataBlockMapper.findByObjectId(objectId);
         String hostPort = dataBlock.getHostPort();
         StoreService storeService = rpcService.getStoreService(hostPort);
@@ -249,7 +253,7 @@ public class OssClientService {
             scope = ObjectScope.PRIVATE.getCode();
         }
 
-        FileMeta fileMeta = fileMetaMapper.findByObjectId(objectId);
+        FileMeta fileMeta = objectRepository.getFileMetaById(objectId);
         if (fileMeta != null) {
             int currentScope = fileMeta.getScope();
             if (currentScope != scope) {
@@ -262,7 +266,7 @@ public class OssClientService {
     }
 
     public ObjectInfo getObjectInfo(String objectId) {
-        FileMeta fileMeta = fileMetaMapper.findByObjectId(objectId);
+        FileMeta fileMeta = objectRepository.getFileMetaById(objectId);
         if (fileMeta != null) {
             String host = "";
             int httpPort = 0;
@@ -294,7 +298,6 @@ public class OssClientService {
 
                 ObjectInfo objectInfo = objectRepository.getObjectInfo(objectId);
                 objectInfo.setUrl(objectUrl);
-                fileMetaMapper.updateSetActiveByObjectId(objectId);
                 return objectInfo;
             }
         }
@@ -305,7 +308,7 @@ public class OssClientService {
     }
 
     public VideoInfo getVideoInfo(String objectId) {
-        FileMeta fileMeta = fileMetaMapper.findByObjectId(objectId);
+        FileMeta fileMeta = objectRepository.getFileMetaById(objectId);
         UploadTask uploadTask = uploadTaskMapper.findByUploadId(objectId);
         if (fileMeta == null || uploadTask == null) {
             String errorMsg = String.format("object with id %s not exist", objectId);
@@ -338,7 +341,6 @@ public class OssClientService {
 
         MediaResolution mediaResolution = MediaQuality.getQuality(videoInfo.getWidth(), videoInfo.getHeight());
         videoInfo.setQuality(mediaResolution.getQualityStr());
-        fileMetaMapper.updateSetActiveByObjectId(objectId);
         return videoInfo;
     }
 }

+ 41 - 3
oss-mgr/src/main/java/cn/reghao/oss/mgr/service/TaskService.java

@@ -1,7 +1,13 @@
 package cn.reghao.oss.mgr.service;
 
+import cn.reghao.oss.api.constant.UploadStatus;
+import cn.reghao.oss.api.iface.StoreService;
+import cn.reghao.oss.mgr.db.mapper.FileMetaMapper;
 import cn.reghao.oss.mgr.db.mapper.UploadTaskMapper;
+import cn.reghao.oss.mgr.model.po.FileMeta;
+import cn.reghao.oss.mgr.model.po.StoreNode;
 import cn.reghao.oss.mgr.model.po.UploadTask;
+import cn.reghao.oss.mgr.rpc.RpcService;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
@@ -19,23 +25,55 @@ import java.util.List;
 @Service
 public class TaskService {
     private final UploadTaskMapper uploadTaskMapper;
+    private final FileMetaMapper fileMetaMapper;
+    private final StoreNodeService storeNodeService;
+    private final RpcService rpcService;
 
-    public TaskService(UploadTaskMapper uploadTaskMapper) {
+    public TaskService(UploadTaskMapper uploadTaskMapper, FileMetaMapper fileMetaMapper,
+                       StoreNodeService storeNodeService, RpcService rpcService) {
         this.uploadTaskMapper = uploadTaskMapper;
+        this.fileMetaMapper = fileMetaMapper;
+        this.storeNodeService = storeNodeService;
+        this.rpcService = rpcService;
     }
 
-    // 每天凌晨 4:00 执行清理任务
-    @Scheduled(cron = "0 0 4 * * ?")
+    // 每天凌晨 2:00 执行清理任务
+    @Scheduled(cron = "0 0 2 * * ?")
     @Transactional
     public void cleanExpiredTasks() {
         log.info("执行过期 UploadTask 清理任务...");
         // 1. 找出过期的任务
         List<UploadTask> expiredTasks = uploadTaskMapper.findByExpired();
         for (UploadTask task : expiredTasks) {
+            int status = task.getStatus();
+            if (status == UploadStatus.UPLOADING.getCode()) {
+                String host = task.getHost();
+                int httpPort = task.getHttpPort();
+                if (host != null && httpPort != 0) {
+                    StoreNode storeNode = storeNodeService.getStoreNode(host, httpPort);
+                    StoreService storeService = rpcService.getStoreService(storeNode);
+                    //storeService.deleteFile();
+                }
+            } else if (status == UploadStatus.FLUSHING.getCode()) {
+
+            } else if (status == UploadStatus.AVAILABLE.getCode()) {
+                uploadTaskMapper.deleteByUploadId(task.getUploadId());
+            }
+
             // 2. 物理删除 SSD 上的残余文件块(重要!防止 SSD 满)
             //FileUtil.deleteQuietly(task.getSsdPath());
             // 3. 逻辑删除数据库记录
             //jdbcTemplate.update("DELETE FROM upload_tasks WHERE upload_id = ?", task.getUploadId());
         }
     }
+
+    // 每天凌晨 4:00 执行清理任务
+    @Scheduled(cron = "0 0 4 * * ?")
+    @Transactional
+    public void cleanInactiveObject() {
+        log.info("清理未激活的 object...");
+        List<FileMeta> list = fileMetaMapper.findAllByInactive();
+        for (FileMeta fileMeta : list) {
+        }
+    }
 }

+ 41 - 0
oss-mgr/src/main/java/cn/reghao/oss/mgr/service/UploadTaskService.java

@@ -0,0 +1,41 @@
+package cn.reghao.oss.mgr.service;
+
+import cn.reghao.jutil.jdk.converter.ByteConverter;
+import cn.reghao.jutil.jdk.web.db.Page;
+import cn.reghao.jutil.jdk.web.db.PageList;
+import cn.reghao.oss.api.constant.UploadStatus;
+import cn.reghao.oss.mgr.db.mapper.UploadTaskMapper;
+import cn.reghao.oss.mgr.model.dto.UploadTaskQuery;
+import cn.reghao.oss.mgr.model.vo.UploadTaskInfo;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2026-06-01 11:29:38
+ */
+@Service
+public class UploadTaskService {
+    private int pageSize = 12;
+    private final UploadTaskMapper uploadTaskMapper;
+    private final ByteConverter byteConverter = new ByteConverter();
+
+    public UploadTaskService(UploadTaskMapper uploadTaskMapper) {
+        this.uploadTaskMapper = uploadTaskMapper;
+    }
+
+    public PageList<UploadTaskInfo> getUploadTasks(int pageNumber, UploadTaskQuery uploadTaskQuery) {
+        Page page = new Page(pageNumber, pageSize);
+        int total = uploadTaskMapper.countByQuery(uploadTaskQuery);
+        List<UploadTaskInfo> list = uploadTaskMapper.findQueryByPage(page, uploadTaskQuery).stream()
+                .map(uploadTask -> {
+                    String sizeStr = byteConverter.convert(uploadTask.getFileSize());
+                    int status = uploadTask.getStatus();
+                    String statusStr = UploadStatus.getByCode(status).getName();
+                    return new UploadTaskInfo(uploadTask, sizeStr, statusStr);
+                })
+                .toList();
+        return PageList.pageList(page, total, list);
+    }
+}

+ 6 - 0
oss-mgr/src/main/resources/mapper/FileMetaMapper.xml

@@ -59,6 +59,12 @@
         where `deleted` is true
         limit 1000
     </select>
+    <select id="findAllByInactive" resultType="cn.reghao.oss.mgr.model.po.FileMeta">
+        select *
+        from file_meta
+        where `active` is false
+        limit 1000
+    </select>
     <select id="findBySha256sum" resultType="cn.reghao.oss.mgr.model.po.FileMeta">
         select *
         from file_meta

+ 21 - 0
oss-mgr/src/main/resources/mapper/UploadTaskMapper.xml

@@ -36,4 +36,25 @@
         where expire_time &lt; now()
         LIMIT 100
     </select>
+
+    <select id="countByQuery" resultType="java.lang.Integer">
+        select count(*)
+        from upload_task
+        <where>
+            deleted=0
+            <if test="status != null">
+                and `status`=#{status}
+            </if>
+        </where>
+    </select>
+    <select id="findQueryByPage" resultType="cn.reghao.oss.mgr.model.po.UploadTask">
+        select *
+        from upload_task
+        <where>
+            deleted=0
+            <if test="uploadTaskQuery.status != null">
+                and status=#{uploadTaskQuery.status}
+            </if>
+        </where>
+    </select>
 </mapper>

+ 22 - 0
oss-sdk/src/main/java/cn/reghao/oss/sdk/OssClient.java

@@ -485,6 +485,23 @@ public class OssClient {
         }
     }
 
+    public void setObjectActive(String objectId) {
+        Map<String, String> map = Map.of("objectId", objectId);
+        String jsonBody = JsonConverter.objectToJson(map);
+
+        String api = "/api/oss/sdk/object/activate";
+        try {
+            String responseBody = getPostResponseBody(api, jsonBody);
+            Type type = new TypeToken<WebResult<String>>(){}.getType();
+            WebResult<String> webResult = JsonConverter.jsonToObject(responseBody, type);
+            if (webResult.getCode() != 0) {
+                throw new RuntimeException(webResult.getMsg());
+            }
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
     public void checkAndSetScope(String objectId, int scope) {
         Map<String, Object> body = Map.of(
                 "objectId", objectId,
@@ -494,6 +511,11 @@ public class OssClient {
         String api = "/api/oss/sdk/object/scope";
         try {
             String responseBody = getPostResponseBody(api, jsonBody);
+            Type type = new TypeToken<WebResult<String>>(){}.getType();
+            WebResult<String> webResult = JsonConverter.jsonToObject(responseBody, type);
+            if (webResult.getCode() != 0) {
+                throw new RuntimeException(webResult.getMsg());
+            }
         } catch (Exception e) {
             throw new RuntimeException(e);
         }