Переглянути джерело

oss-store 大文件分片上传接口实现中

reghao 1 рік тому
батько
коміт
4b43186cc7

+ 5 - 5
oss-api/src/main/java/cn/reghao/oss/api/rest/UploadFilePart.java

@@ -22,13 +22,13 @@ public class UploadFilePart implements Serializable {
     private String filename;
     private String relativePath;
     // 文件大小
-    private Long totalSize;
+    private long totalSize;
     // 分片文件大小
-    private Integer chunkSize;
+    private long chunkSize;
     // 分片文件数量
-    private Integer totalChunks;
+    private long totalChunks;
     // 当前分片文件索引
-    private Integer chunkNumber;
+    private int chunkNumber;
     // 当前分片文件大小
-    private Integer currentChunkSize;
+    private int currentChunkSize;
 }

+ 3 - 19
oss-store/src/main/java/cn/reghao/oss/store/controller/ObjectMultipartUploadController.java

@@ -76,9 +76,8 @@ public class ObjectMultipartUploadController {
     }
 
     @ApiOperation(value = "获取已上传的对象分片")
-    @GetMapping(value = "/", params = {"multipart"}, produces = MediaType.APPLICATION_JSON_VALUE)
-    public ResponseEntity<String> getUploadedPart() {
-        int channelId = 1;
+    @PostMapping(value = "/", params = {"get_multipart"}, produces = MediaType.APPLICATION_JSON_VALUE)
+    public ResponseEntity<String> getUploadedPart(String sha256sum) {
         /* permission check */
         String token = ServletUtil.getBearerToken();
         if (token == null) {
@@ -94,22 +93,7 @@ public class ObjectMultipartUploadController {
                     .body(WebResult.failWithMsg("it's not upload token"));
         }
 
-        int channelId1 = ossPayload.getChannelId();
-        if (channelId != channelId1) {
-            return ResponseEntity.status(HttpStatus.FORBIDDEN)
-                    .body(WebResult.failWithMsg("channel not match in token"));
-        }
-
-        int loginUser = ossPayload.getUserId();
-        ObjectChannel objectChannel = consoleService.getChannelById(loginUser, channelId);
-        if (objectChannel == null) {
-            String errMsg = String.format("channel validate failed, channel %s not exist", channelId);
-            return ResponseEntity.status(HttpStatus.FORBIDDEN)
-                    .body(WebResult.failWithMsg(errMsg));
-        }
-        AuthContext context = new AuthContext(loginUser);
-
-        UploadedPart uploadedPart = objectMultipartUploadService.getUploadedPart();
+        UploadedPart uploadedPart = objectMultipartUploadService.getUploadedPart(sha256sum);
         return ResponseEntity.status(HttpStatus.OK).body(WebResult.success(uploadedPart));
     }
 

+ 16 - 0
oss-store/src/main/java/cn/reghao/oss/store/db/mapper/FileMultipartMapper.java

@@ -0,0 +1,16 @@
+package cn.reghao.oss.store.db.mapper;
+
+import cn.reghao.jutil.jdk.db.BaseMapper;
+import cn.reghao.oss.store.model.po.FileMultipart;
+import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
+
+/**
+ * @author reghao
+ * @date 2024-10-24 16:35:48
+ */
+@Mapper
+public interface FileMultipartMapper extends BaseMapper<FileMultipart> {
+    void updateSetUploaded(String sha256sum);
+    FileMultipart findBySha256sumAndUploadBy(@Param("sha256sum") String sha256sum, @Param("uploadBy") int uploadBy);
+}

+ 1 - 1
oss-store/src/main/java/cn/reghao/oss/store/db/mapper/FilePartMapper.java

@@ -12,5 +12,5 @@ import java.util.List;
  */
 @Mapper
 public interface FilePartMapper extends BaseMapper<FilePart> {
-    List<FilePart> findByObjectId(String objectId);
+    List<FilePart> findByContentId(String contentId);
 }

+ 0 - 16
oss-store/src/main/java/cn/reghao/oss/store/db/mapper/FilePartsMapper.java

@@ -1,16 +0,0 @@
-package cn.reghao.oss.store.db.mapper;
-
-import cn.reghao.jutil.jdk.db.BaseMapper;
-import cn.reghao.oss.store.model.po.FilePart;
-import cn.reghao.oss.store.model.po.FileParts;
-import org.apache.ibatis.annotations.Mapper;
-
-/**
- * @author reghao
- * @date 2024-10-24 16:35:48
- */
-@Mapper
-public interface FilePartsMapper extends BaseMapper<FileParts> {
-    FilePart findByObjectId(String objectId);
-    FilePart findBySha256sum(String sha256sum);
-}

+ 24 - 10
oss-store/src/main/java/cn/reghao/oss/store/db/repository/FilePartRepository.java

@@ -1,11 +1,13 @@
 package cn.reghao.oss.store.db.repository;
 
+import cn.reghao.oss.api.util.AuthContext;
 import cn.reghao.oss.store.db.mapper.FilePartMapper;
-import cn.reghao.oss.store.db.mapper.FilePartsMapper;
+import cn.reghao.oss.store.db.mapper.FileMultipartMapper;
 import cn.reghao.oss.store.model.po.FilePart;
-import cn.reghao.oss.store.model.po.FileParts;
+import cn.reghao.oss.store.model.po.FileMultipart;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
 
 /**
  * @author reghao
@@ -13,21 +15,33 @@ import org.springframework.transaction.annotation.Transactional;
  */
 @Service
 public class FilePartRepository {
-    private final FilePartsMapper filePartsMapper;
+    private final FileMultipartMapper fileMultipartMapper;
     private final FilePartMapper filePartMapper;
 
-    public FilePartRepository(FilePartsMapper filePartsMapper, FilePartMapper filePartMapper) {
-        this.filePartsMapper = filePartsMapper;
+    public FilePartRepository(FileMultipartMapper fileMultipartMapper, FilePartMapper filePartMapper) {
+        this.fileMultipartMapper = fileMultipartMapper;
         this.filePartMapper = filePartMapper;
     }
 
-    @Transactional(rollbackFor = Exception.class)
-    public void save(FileParts fileParts, FilePart filePart) {
-        filePartsMapper.save(fileParts);
-        filePartMapper.save(filePart);
+    public void saveFileParts(FileMultipart fileMultipart) {
+        fileMultipartMapper.save(fileMultipart);
     }
 
     public void saveFilePart(FilePart filePart) {
         filePartMapper.save(filePart);
     }
+
+    public void updateSetUploaded(String sha256sum) {
+        fileMultipartMapper.updateSetUploaded(sha256sum);
+    }
+
+    public List<FilePart> getFileParts(String contentId) {
+        return filePartMapper.findByContentId(contentId);
+    }
+
+    public FileMultipart getFileMultipart(String sha256sum) {
+        int loginUser = AuthContext.getUserId();
+        FileMultipart fileMultipart = fileMultipartMapper.findBySha256sumAndUploadBy(sha256sum, loginUser);
+        return fileMultipart;
+    }
 }

+ 2 - 1
oss-store/src/main/java/cn/reghao/oss/store/disk/LocalStores.java

@@ -46,7 +46,8 @@ public class LocalStores {
                     throw new IOException(msg);
                 }
 
-                storeDirMap.get(storeDir).put(baseDir, new StoreDir(baseDir));
+                String baseDir1 = dir.getAbsolutePath();
+                storeDirMap.get(storeDir).put(baseDir1, new StoreDir(baseDir1));
             }
         }
     }

+ 0 - 18
oss-store/src/main/java/cn/reghao/oss/store/model/dto/PathUrl.java

@@ -1,18 +0,0 @@
-package cn.reghao.oss.store.model.dto;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * 文件的本地路径和 URL
- *
- * @author reghao
- * @date 2022-04-26 15:10:04
- */
-@AllArgsConstructor
-@Getter
-public class PathUrl {
-    private String contentId;
-    // 本地文件路径
-    private String absolutePath;
-}

+ 11 - 8
oss-store/src/main/java/cn/reghao/oss/store/model/po/FileParts.java → oss-store/src/main/java/cn/reghao/oss/store/model/po/FileMultipart.java

@@ -2,6 +2,7 @@ package cn.reghao.oss.store.model.po;
 
 import cn.reghao.jutil.jdk.db.BaseObject;
 import cn.reghao.oss.api.rest.UploadFilePart;
+import cn.reghao.oss.api.util.AuthContext;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 
@@ -11,22 +12,24 @@ import lombok.NoArgsConstructor;
  */
 @NoArgsConstructor
 @Getter
-public class FileParts extends BaseObject<Integer> {
-    private String objectId;
-    private String absolutePath;
+public class FileMultipart extends BaseObject<Integer> {
+    private String contentId;
     private String sha256sum;
+    private String absolutePath;
     private Long totalSize;
     private Integer chunkSize;
     private Integer totalChunks;
     private Boolean uploaded;
+    private Integer uploadBy;
 
-    public FileParts(String objectId, String absolutePath, UploadFilePart uploadFilePart) {
-        this.objectId = objectId;
-        this.absolutePath = absolutePath;
+    public FileMultipart(String contentId, UploadFilePart uploadFilePart, String absolutePath) {
+        this.contentId = contentId;
         this.sha256sum = uploadFilePart.getIdentifier();
+        this.absolutePath = absolutePath;
         this.totalSize = uploadFilePart.getTotalSize();
-        this.chunkSize = uploadFilePart.getChunkSize();
-        this.totalChunks = uploadFilePart.getTotalChunks();
+        this.chunkSize = (int)uploadFilePart.getChunkSize();
+        this.totalChunks = (int)uploadFilePart.getTotalChunks();
         this.uploaded = false;
+        this.uploadBy = AuthContext.getUserId();
     }
 }

+ 3 - 3
oss-store/src/main/java/cn/reghao/oss/store/model/po/FilePart.java

@@ -12,12 +12,12 @@ import lombok.NoArgsConstructor;
 @NoArgsConstructor
 @Getter
 public class FilePart extends BaseObject<Integer> {
-    private String objectId;
+    private String contentId;
     private Integer chunkNumber;
     private Integer currentChunkSize;
 
-    public FilePart(String objectId, UploadFilePart uploadFilePart) {
-        this.objectId = objectId;
+    public FilePart(String contentId, UploadFilePart uploadFilePart) {
+        this.contentId = contentId;
         this.chunkNumber = uploadFilePart.getChunkNumber();
         this.currentChunkSize = uploadFilePart.getCurrentChunkSize();
     }

+ 46 - 41
oss-store/src/main/java/cn/reghao/oss/store/service/ObjectMultipartUploadService.java

@@ -5,9 +5,8 @@ import cn.reghao.oss.api.dto.ObjectChannel;
 import cn.reghao.oss.api.rest.*;
 import cn.reghao.oss.store.db.repository.FilePartRepository;
 import cn.reghao.oss.store.db.repository.ObjectRepository;
-import cn.reghao.oss.store.model.dto.PathUrl;
 import cn.reghao.oss.store.model.po.FilePart;
-import cn.reghao.oss.store.model.po.FileParts;
+import cn.reghao.oss.store.model.po.FileMultipart;
 import cn.reghao.oss.store.model.vo.ObjectProp;
 import cn.reghao.oss.store.util.FileType;
 import cn.reghao.oss.store.util.StringUtil;
@@ -31,8 +30,6 @@ public class ObjectMultipartUploadService {
     private final static long PART_SIZE = 1024*1024*20;
     private final ObjectRepository objectRepository;
     private final FileStoreService fileStoreService;
-    private final Map<String, Set<Long>> map = new HashMap<>();
-    private final Map<String, PathUrl> pathMap = new HashMap<>();
     private final ObjectNameService objectNameService;
     private final PutObjectService putObjectService;
     private final FilePartRepository filePartRepository;
@@ -81,7 +78,14 @@ public class ObjectMultipartUploadService {
         }
     }
 
-    public UploadedPart getUploadedPart() {
+    public UploadedPart getUploadedPart(String sha256sum) {
+        FileMultipart fileMultipart = filePartRepository.getFileMultipart(sha256sum);
+        if (fileMultipart == null) {
+            return null;
+        }
+
+        String contentId = fileMultipart.getContentId();
+        List<FilePart> list = filePartRepository.getFileParts(contentId);
         UploadedPart uploadedPart = new UploadedPart();
         uploadedPart.setSkipUpload(false);
         uploadedPart.setNeedMerge(false);
@@ -107,7 +111,6 @@ public class ObjectMultipartUploadService {
         int currentPartSize = uploadFilePart.getCurrentChunkSize();
         long totalParts = uploadFilePart.getTotalChunks();
         long chunkNumber = uploadFilePart.getChunkNumber();
-        log.info("{} -> {}:{}", currentPartSize, totalParts, chunkNumber);
 
         String sha256sum = uploadFilePart.getIdentifier();
         FileMeta fileMeta = objectRepository.getBySha256sum(sha256sum);
@@ -123,49 +126,51 @@ public class ObjectMultipartUploadService {
             return new UploadFileRet(sha256sum, url);
         }
 
-        Set<Long> set = map.computeIfAbsent(sha256sum, k -> new HashSet<>());
-        if (set.isEmpty()) {
-            String contentId = UUID.randomUUID().toString().replace("-", "");
+        List<FilePart> list = new ArrayList<>();
+        String contentId = filePartRepository.getFileMultipart(sha256sum).getContentId();
+        if (contentId == null) {
+            contentId = UUID.randomUUID().toString().replace("-", "");
             String absolutePath = fileStoreService.genFilePath(contentId, totalSize, "");
             fileStoreService.createSparseFile(absolutePath, totalSize);
 
-            String objectId = "";
-            FileParts fileParts = new FileParts(objectId, absolutePath, uploadFilePart);
-            FilePart filePart = new FilePart(objectId, uploadFilePart);
-            filePartRepository.save(fileParts, filePart);
-
-            PathUrl pathUrl = new PathUrl(contentId, absolutePath);
-            pathMap.put(sha256sum, pathUrl);
-        }
-
-        if (set.add(chunkNumber)) {
-            long pos = (chunkNumber-1) * chunkSize;
-            String absolutePath = pathMap.get(sha256sum).getAbsolutePath();
-            fileStoreService.writeToFile(inputStream, absolutePath, pos);
+            FileMultipart fileMultipart = new FileMultipart(contentId, uploadFilePart, absolutePath);
+            filePartRepository.saveFileParts(fileMultipart);
+        } else {
+            list = filePartRepository.getFileParts(contentId);
         }
 
-        if (set.size() != totalParts) {
+        FilePart filePart = new FilePart(contentId, uploadFilePart);
+        filePartRepository.saveFilePart(filePart);
+        long pos = (chunkNumber-1) * chunkSize;
+        String absolutePath = filePartRepository.getFileMultipart(sha256sum).getAbsolutePath();
+        fileStoreService.writeToFile(inputStream, absolutePath, pos);
+        if (list.size()+1 != totalParts) {
             return new UploadFileRet(sha256sum);
         } else {
-            String contentId = pathMap.get(sha256sum).getContentId();
-            String absolutePath = pathMap.get(sha256sum).getAbsolutePath();
-            FileInputStream fis = new FileInputStream(absolutePath);
-            String sha256sumMerged = DigestUtil.sha256sum(fis);
-            if (!sha256sum.equals(sha256sumMerged)) {
-                throw new Exception("分片合并文件的 sha256sum 与原文件不一致!");
-            }
-
-            log.info("合并的文件 {}", absolutePath);
-            String suffix = StringUtil.getSuffix(filename);
-            ObjectProp objectProp = objectNameService.getObjectProp(objectChannel, suffix);
-            File savedFile = new File(absolutePath);
-            putObjectService.putObject(objectProp, contentId, savedFile, filename, sha256sum);
+            return mergeFileParts(absolutePath, sha256sum, objectChannel, filename);
+        }
+    }
 
-            map.remove(sha256sum);
-            pathMap.remove(sha256sum);
-            String host = ServletUtil.getHeader("host");
-            String url = String.format("//%s/%s", host, objectProp);
-            return new UploadFileRet(sha256sum, url);
+    private UploadFileRet mergeFileParts(String absolutePath,
+                                         String sha256sum,
+                                         ObjectChannel objectChannel,
+                                         String filename) throws Exception {
+        FileInputStream fis = new FileInputStream(absolutePath);
+        String sha256sumMerged = DigestUtil.sha256sum(fis);
+        if (!sha256sum.equals(sha256sumMerged)) {
+            throw new Exception("分片合并文件的 sha256sum 与原文件不一致!");
         }
+
+        log.info("合并的文件 {}", absolutePath);
+        String contentId = filePartRepository.getFileMultipart(sha256sum).getContentId();
+        String suffix = StringUtil.getSuffix(filename);
+        ObjectProp objectProp = objectNameService.getObjectProp(objectChannel, suffix);
+        File savedFile = new File(absolutePath);
+        putObjectService.putObject(objectProp, contentId, savedFile, filename, sha256sum);
+
+        filePartRepository.updateSetUploaded(sha256sum);
+        String host = ServletUtil.getHeader("host");
+        String url = String.format("//%s/%s", host, objectProp);
+        return new UploadFileRet(sha256sum, url);
     }
 }

+ 23 - 0
oss-store/src/main/resources/mapper/FileMultipartMapper.xml

@@ -0,0 +1,23 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+
+<mapper namespace="cn.reghao.oss.store.db.mapper.FileMultipartMapper">
+    <insert id="save" useGeneratedKeys="true" keyProperty="id">
+        insert into file_parts
+        (`content_id`,`sha256sum`,`absolute_path`,`total_size`,`chunk_size`,`total_chunks`,`uploaded`,`upload_by`)
+        values
+        (#{contentId},#{sha256sum},#{absolutePath},#{totalSize},#{chunkSize},#{totalChunks},#{uploaded},#{uploadBy})
+    </insert>
+
+    <update id="updateSetUploaded">
+        update file_parts
+        set uploaded=1
+        where sha256sum=#{sha256sum}
+    </update>
+
+    <select id="findBySha256sumAndUploadBy" resultType="cn.reghao.oss.store.model.po.FileMultipart">
+        select *
+        from file_parts
+        where sha256sum=#{sha256sum} and upload_by=#{uploadBy}
+    </select>
+</mapper>

+ 6 - 6
oss-store/src/main/resources/mapper/FilePartMapper.xml

@@ -4,22 +4,22 @@
 <mapper namespace="cn.reghao.oss.store.db.mapper.FilePartMapper">
     <insert id="save" useGeneratedKeys="true" keyProperty="id">
         insert into file_part
-        (`object_id`,`chunk_number`,`current_chunk_size`)
+        (`content_id`,`chunk_number`,`current_chunk_size`)
         values
-        (#{objectId},#{chunkNumber},#{current_chunkSize})
+        (#{contentId},#{chunkNumber},#{currentChunkSize})
     </insert>
     <insert id="saveAll" useGeneratedKeys="true" keyProperty="id">
         insert into file_part
-        (`object_id`,`chunk_number`,`current_chunk_size`)
+        (`content_id`,`chunk_number`,`current_chunk_size`)
         values
         <foreach collection="list" item="item" index="index" separator=",">
-            (#{item.objectId},#{item.chunkNumber},#{item.current_chunkSize})
+            (#{item.contentId},#{item.chunkNumber},#{item.currentChunkSize})
         </foreach>
     </insert>
 
-    <select id="findByObjectId" resultType="cn.reghao.oss.store.model.po.FilePart">
+    <select id="findByContentId" resultType="cn.reghao.oss.store.model.po.FilePart">
         select *
         from file_part
-        where object_id=#{objectId}
+        where content_id=#{contentId}
     </select>
 </mapper>

+ 0 - 22
oss-store/src/main/resources/mapper/FilePartsMapper.xml

@@ -1,22 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
-
-<mapper namespace="cn.reghao.oss.store.db.mapper.FilePartsMapper">
-    <insert id="save" useGeneratedKeys="true" keyProperty="id">
-        insert into file_parts
-        (`object_id`,`absolute_path`,`sha256sum`,`total_size`,`chunk_size`,`total_chunks`,`uploaded`)
-        values
-        (#{objectId},#{absolutePath},#{sha256sum},#{totalSize},#{chunkSize},#{totalChunks},#{uploaded})
-    </insert>
-    
-    <select id="findByObjectId" resultType="cn.reghao.oss.store.model.po.FileParts">
-        select * 
-        from file_parts
-        where object_id=#{objectId}
-    </select>
-    <select id="findBySha256sum" resultType="cn.reghao.oss.store.model.po.FileParts">
-        select *
-        from file_parts
-        where sha256sum=#{sha256sum}
-    </select>
-</mapper>