فهرست منبع

update dfs-store

reghao 2 سال پیش
والد
کامیت
10a8f90049

+ 3 - 13
dfs-store/src/main/java/cn/reghao/dfs/store/controller/ObjectGetController.java

@@ -1,13 +1,10 @@
 package cn.reghao.dfs.store.controller;
 
 import cn.reghao.dfs.store.service.GetObjectService;
-import cn.reghao.jutil.web.ServletUtil;
+import cn.reghao.dfs.store.util.ObjectUtil;
 import org.springframework.web.bind.annotation.*;
 
-import javax.servlet.http.HttpServletRequest;
 import java.io.IOException;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
 
 /**
  * @author reghao
@@ -23,20 +20,13 @@ public class ObjectGetController {
 
     @RequestMapping(value = "/**", method = RequestMethod.HEAD)
     public void headObject() throws IOException {
-        String objectName = getObjectName();
+        String objectName = ObjectUtil.getObjectName();
         getObjectService.headObject(objectName);
     }
 
     @GetMapping(value = "/**")
     public void getObject() throws IOException {
-        String objectName = getObjectName();
+        String objectName = ObjectUtil.getObjectName();
         getObjectService.getObject(objectName);
     }
-
-    private String getObjectName() {
-        HttpServletRequest servletRequest = ServletUtil.getRequest();
-        String uri = servletRequest.getRequestURI();
-        String uri1 = URLDecoder.decode(uri, StandardCharsets.UTF_8);
-        return uri1.replaceFirst("/", "");
-    }
 }

+ 10 - 9
dfs-store/src/main/java/cn/reghao/dfs/store/controller/ObjectMultipartUploadController.java

@@ -1,11 +1,12 @@
 package cn.reghao.dfs.store.controller;
 
-import cn.reghao.dfs.store.model.dto.*;
 import cn.reghao.dfs.store.service.ObjectMultipartUploadService;
 import cn.reghao.jutil.jdk.result.WebResult;
+import cn.reghao.oss.common.*;
 import org.springframework.http.MediaType;
 import org.springframework.validation.annotation.Validated;
 import org.springframework.web.bind.annotation.*;
+import org.springframework.web.multipart.MultipartFile;
 
 import java.util.*;
 
@@ -29,17 +30,17 @@ public class ObjectMultipartUploadController {
 
     @GetMapping(value = "/", params = {"multipart"}, produces = MediaType.APPLICATION_JSON_VALUE)
     public String getUploadedPart() {
-        Map<String, Object> map = new HashMap<>();
-        map.put("skipUpload", false);
-        map.put("url", "");
-        map.put("needMerge", false);
-        map.put("uploaded", Collections.emptyList());
-        return WebResult.success(map);
+        UploadedPart uploadedPart = new UploadedPart();
+        uploadedPart.setSkipUpload(false);
+        uploadedPart.setNeedMerge(false);
+        uploadedPart.setUrl("");
+        uploadedPart.setUploaded(Collections.emptyList());
+        return WebResult.success(uploadedPart);
     }
 
     @PostMapping(value = "/", params = {"multipart"}, produces = MediaType.APPLICATION_JSON_VALUE)
-    public String uploadPart(@Validated UploadFilePart uploadFilePart) throws Exception {
-        UploadFileRet uploadFileRet = objectMultipartUploadService.putFilePart(uploadFilePart);
+    public String uploadPart(MultipartFile file, @Validated UploadFilePart uploadFilePart) throws Exception {
+        UploadFileRet uploadFileRet = objectMultipartUploadService.putFilePart(file.getInputStream(), uploadFilePart);
         return WebResult.success(uploadFileRet);
     }
 

+ 44 - 31
dfs-store/src/main/java/cn/reghao/dfs/store/controller/ObjectUploadController.java

@@ -1,12 +1,12 @@
 package cn.reghao.dfs.store.controller;
 
-import cn.reghao.dfs.store.model.constant.UploadChannel;
-import cn.reghao.dfs.store.model.dto.UploadFile;
+import cn.reghao.dfs.store.service.FileStoreService;
 import cn.reghao.dfs.store.service.PutObjectService;
+import cn.reghao.dfs.store.util.ObjectUtil;
 import cn.reghao.jutil.jdk.result.WebResult;
 import cn.reghao.jutil.jdk.security.DigestUtil;
 import cn.reghao.jutil.web.ServletUtil;
-import org.springframework.validation.annotation.Validated;
+import org.apache.commons.io.FileUtils;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.PutMapping;
 import org.springframework.web.bind.annotation.RequestBody;
@@ -16,7 +16,6 @@ import org.springframework.web.multipart.MultipartFile;
 import javax.servlet.http.HttpServletRequest;
 import java.io.File;
 import java.io.FileInputStream;
-import java.nio.file.Path;
 import java.util.UUID;
 
 /**
@@ -25,46 +24,60 @@ import java.util.UUID;
  */
 @RestController
 public class ObjectUploadController {
+    private final FileStoreService fileStoreService;
     private final PutObjectService putObjectService;
 
-    public ObjectUploadController(PutObjectService putObjectService) {
+    public ObjectUploadController(FileStoreService fileStoreService, PutObjectService putObjectService) {
+        this.fileStoreService = fileStoreService;
         this.putObjectService = putObjectService;
     }
 
     //@PutMapping(value = "/**")
     @PutMapping(value = "/")
-    public String putObject(@RequestBody File file) throws Exception {
-        HttpServletRequest servletRequest = ServletUtil.getRequest();
-        int channelId = Integer.parseInt(servletRequest.getHeader("x-channel-id"));
-        String sha256sum = servletRequest.getHeader("x-content-sha256sum");
+    public String putObject(@RequestBody File file) {
+        try {
+            HttpServletRequest servletRequest = ServletUtil.getRequest();
+            int channelId = Integer.parseInt(servletRequest.getHeader("x-channel-id"));
+            String sha256sum = servletRequest.getHeader("x-content-sha256sum");
 
-        FileInputStream fis = new FileInputStream(file);
-        String sha256sum1 = DigestUtil.sha256sum(fis);
-        if (!sha256sum.equals(sha256sum1)) {
-            return WebResult.failWithMsg("sha256sum 不匹配");
-        }
+            FileInputStream fis = new FileInputStream(file);
+            String sha256sum1 = DigestUtil.sha256sum(fis);
+            if (!sha256sum.equals(sha256sum1)) {
+                FileUtils.deleteQuietly(file);
+                return WebResult.failWithMsg("sha256sum 不匹配");
+            }
 
-        String objectName = getObjectName(channelId);
-        putObjectService.putObject(objectName, file, sha256sum);
-        file.delete();
-        return WebResult.success();
+            String objectName = ObjectUtil.getObjectName(channelId);
+            String pid = putObjectService.getPid(objectName);
+            String contentId = UUID.randomUUID().toString().replace("-", "");
+            String originalFilename = file.getName();
+            long size = file.length();
+            File savedFile = fileStoreService.saveFile(file.getAbsolutePath(), contentId, size);
+            putObjectService.putObject(pid, objectName, contentId, savedFile, originalFilename, sha256sum);
+            // TODO PutMessageConverter 中生成的文件需要显式删除
+            FileUtils.deleteQuietly(file);
+            return WebResult.success();
+        } catch (Exception e) {
+            FileUtils.deleteQuietly(file);
+            String errMsg = e.getMessage();
+            return WebResult.failWithMsg(errMsg);
+        }
     }
 
     @PostMapping(value = "/")
-    public String postObject(@Validated UploadFile uploadFile) throws Exception {
-        int channelId = uploadFile.getChannelId();
-        String objectName = getObjectName(channelId);
+    public String postObject(MultipartFile file, String sha256sum, Integer channelId) throws Exception {
+        String contentId = UUID.randomUUID().toString().replace("-", "");
+        long size = file.getSize();
+        File savedFile = fileStoreService.saveFile(file.getInputStream(), contentId, size);
+        String sha256sum1 = DigestUtil.sha256sum(savedFile.getAbsolutePath());
+        if (!sha256sum.equals(sha256sum1)) {
+            return WebResult.failWithMsg("sha256sum 不匹配");
+        }
 
-        MultipartFile multipartFile = uploadFile.getFile();
-        multipartFile.transferTo(Path.of("/opt/tmp/tomcat/1"));
+        String objectName = ObjectUtil.getObjectName(channelId);
+        String pid = putObjectService.getPid(objectName);
+        String originalFilename = file.getOriginalFilename();
+        putObjectService.putObject(pid, objectName, contentId, savedFile, originalFilename, sha256sum);
         return WebResult.success();
     }
-
-    private String getObjectName(int channelId) throws Exception {
-        String prefix = UploadChannel.getPrefix(channelId);
-        if (prefix == null) {
-            throw new Exception("channelId 不合法");
-        }
-        return prefix + UUID.randomUUID().toString().replace("-", "");
-    }
 }

+ 2 - 0
dfs-store/src/main/java/cn/reghao/dfs/store/db/mapper/FileMetaMapper.java

@@ -2,6 +2,7 @@ package cn.reghao.dfs.store.db.mapper;
 
 import cn.reghao.dfs.api.dto.DeleteFile;
 import cn.reghao.dfs.api.dto.FileInfo;
+import cn.reghao.dfs.store.model.po.DataBlock;
 import cn.reghao.dfs.store.model.po.FileMeta;
 import cn.reghao.dfs.store.model.vo.ObjectMeta;
 import cn.reghao.jutil.jdk.db.BaseMapper;
@@ -35,6 +36,7 @@ public interface FileMetaMapper extends BaseMapper<FileMeta> {
     void updateParent(@Param("objectId") String objectId, @Param("pid") String pid);
     void updateSetDelete(String objectId);
     void deleteByObjectId(String objectId);
+    void updateBatch(List<FileMeta> list);
 
     int countByPid(String pid);
     List<FileInfo> findFileInfoByPage(Page page, @Param("pid") String pid);

+ 29 - 3
dfs-store/src/main/java/cn/reghao/dfs/store/db/repository/FileMetaRepository.java → dfs-store/src/main/java/cn/reghao/dfs/store/db/repository/ObjectRepository.java

@@ -1,6 +1,8 @@
 package cn.reghao.dfs.store.db.repository;
 
+import cn.reghao.dfs.store.db.mapper.DataBlockMapper;
 import cn.reghao.dfs.store.db.mapper.FileMetaMapper;
+import cn.reghao.dfs.store.model.po.DataBlock;
 import cn.reghao.dfs.store.model.po.FileMeta;
 import cn.reghao.dfs.store.model.vo.ObjectMeta;
 import lombok.extern.slf4j.Slf4j;
@@ -8,6 +10,9 @@ import org.springframework.cache.annotation.CacheEvict;
 import org.springframework.cache.annotation.CachePut;
 import org.springframework.cache.annotation.Cacheable;
 import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.List;
 
 /**
  * @author reghao
@@ -15,15 +20,28 @@ import org.springframework.stereotype.Service;
  */
 @Slf4j
 @Service
-public class FileMetaRepository {
+public class ObjectRepository {
     private final FileMetaMapper fileMetaMapper;
+    private final DataBlockMapper dataBlockMapper;
 
-    public FileMetaRepository(FileMetaMapper fileMetaMapper) {
+    public ObjectRepository(FileMetaMapper fileMetaMapper, DataBlockMapper dataBlockMapper) {
         this.fileMetaMapper = fileMetaMapper;
+        this.dataBlockMapper = dataBlockMapper;
+    }
+
+    public void saveFileMeta(FileMeta fileMeta) {
+        fileMetaMapper.save(fileMeta);
     }
 
     @CachePut
-    public void save(FileMeta fileMeta) {
+    public void saveFileMetas(List<FileMeta> list) {
+        fileMetaMapper.saveAll(list);
+    }
+
+    @Transactional(rollbackFor = Exception.class)
+    public void saveObject(FileMeta fileMeta, List<DataBlock> list) {
+        fileMetaMapper.save(fileMeta);
+        dataBlockMapper.saveAll(list);
     }
 
     public void update(FileMeta fileMeta) {
@@ -33,6 +51,14 @@ public class FileMetaRepository {
     public void delete(FileMeta fileMeta) {
     }
 
+    public FileMeta getByObjectName(String objectName) {
+        return fileMetaMapper.findByObjectName(objectName);
+    }
+
+    public FileMeta getBySha256sum(String sha256sum) {
+        return fileMetaMapper.findBySha256sum(sha256sum);
+    }
+
     @Cacheable(cacheNames = "objectMeta", key = "#objectName")
     public ObjectMeta getObjectMeta(String objectName) {
         log.info("查找 db");

+ 0 - 26
dfs-store/src/main/java/cn/reghao/dfs/store/model/dto/UploadFile.java

@@ -1,26 +0,0 @@
-package cn.reghao.dfs.store.model.dto;
-
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import org.springframework.web.multipart.MultipartFile;
-
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2023-05-23 10:03:54
- */
-@NoArgsConstructor
-@Getter
-@Setter
-public class UploadFile implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @NotNull
-    private Integer channelId;
-    @NotNull
-    private MultipartFile file;
-}

+ 0 - 46
dfs-store/src/main/java/cn/reghao/dfs/store/model/dto/UploadFilePart.java

@@ -1,46 +0,0 @@
-package cn.reghao.dfs.store.model.dto;
-
-import lombok.Getter;
-import lombok.Setter;
-import org.springframework.web.multipart.MultipartFile;
-
-import javax.validation.constraints.NotBlank;
-import javax.validation.constraints.NotNull;
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2022-04-25 10:42:38
- */
-@Getter
-@Setter
-public class UploadFilePart implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @NotNull
-    private int channelId;
-    private String pid;
-    @NotNull
-    private MultipartFile file;
-    // 文件标识
-    @NotBlank
-    private String identifier;
-    private String filename;
-    private String relativePath;
-    // 文件大小
-    @NotNull
-    private Long totalSize;
-    // 分片文件大小
-    @NotNull
-    private Long chunkSize;
-    // 当前分片文件大小
-    @NotNull
-    private Integer currentChunkSize;
-    // 分片文件数量
-    @NotNull
-    private Integer totalChunks;
-    // 当前分片文件索引
-    @NotNull
-    private Integer chunkNumber;
-    private String partSha256sum;
-}

+ 50 - 6
dfs-store/src/main/java/cn/reghao/dfs/store/service/FileStoreService.java

@@ -7,7 +7,9 @@ import org.springframework.stereotype.Service;
 
 import java.io.*;
 import java.nio.file.Files;
+import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
 import java.util.EnumSet;
 
 import static java.nio.file.StandardOpenOption.*;
@@ -25,7 +27,6 @@ public class FileStoreService {
     }
 
     public String genFilePath(String contentId, long size) {
-        //StoreDir storeDir = loadBalancer.getStoreDir(size, sha256sum);
         StoreDir storeDir = loadBalancer.getStoreDir(size);
         String fileDir = storeDir.getAbsoluteDirPath();
         return String.format("%s/%s", fileDir, contentId);
@@ -53,10 +54,11 @@ public class FileStoreService {
         in.close();
     }
 
-    public void saveFile(String absolutePath, byte[] bytes) throws IOException {
+    public File saveFile(byte[] bytes, String contentId) throws IOException {
+        String absolutePath = genFilePath(contentId, bytes.length);
         File file = new File(absolutePath);
         if (file.exists()) {
-            return;
+            throw new IOException(absolutePath + " exist");
         }
 
         File parentDir = file.getParentFile();
@@ -67,14 +69,55 @@ public class FileStoreService {
         FileOutputStream fos = new FileOutputStream(file);
         fos.write(bytes);
         fos.close();
+        return file;
     }
 
-    public void saveFile(String absolutePath, InputStream in) throws IOException {
+    public File saveFile(String srcPath, String contentId, long size) throws IOException {
+        String absolutePath = genFilePath(contentId, size);
         File file = new File(absolutePath);
         if (file.exists()) {
-            return;
+            throw new IOException(absolutePath + " exist");
         }
 
+        Files.move(Path.of(absolutePath), Path.of(srcPath), StandardCopyOption.REPLACE_EXISTING);
+        return file;
+    }
+
+    public File saveFile(InputStream inputStream, String contentId, long size) throws IOException {
+        String absolutePath = genFilePath(contentId, size);
+        File file = new File(absolutePath);
+        if (file.exists()) {
+            throw new IOException(absolutePath + " exist");
+        }
+
+        Files.copy(inputStream, Path.of(absolutePath), StandardCopyOption.REPLACE_EXISTING);
+
+        File parentDir = file.getParentFile();
+        if (!parentDir.exists()) {
+            FileUtils.forceMkdir(parentDir);
+        }
+
+        FileOutputStream fos = new FileOutputStream(file);
+        // 1MiB
+        int len = 1024*1024;
+        byte[] buf = new byte[len];
+        int readLen;
+        while ((readLen = inputStream.read(buf, 0, len)) != -1) {
+            fos.write(buf, 0, readLen);
+        }
+        fos.close();
+        inputStream.close();
+        return file;
+    }
+
+    public void saveFile(InputStream inputStream, String absolutePath) throws IOException {
+        File file = new File(absolutePath);
+        if (file.exists()) {
+            throw new IOException(absolutePath + " exist");
+        }
+
+        Files.copy(inputStream, Path.of(absolutePath), StandardCopyOption.REPLACE_EXISTING);
+
         File parentDir = file.getParentFile();
         if (!parentDir.exists()) {
             FileUtils.forceMkdir(parentDir);
@@ -85,9 +128,10 @@ public class FileStoreService {
         int len = 1024*1024;
         byte[] buf = new byte[len];
         int readLen;
-        while ((readLen = in.read(buf, 0, len)) != -1) {
+        while ((readLen = inputStream.read(buf, 0, len)) != -1) {
             fos.write(buf, 0, readLen);
         }
         fos.close();
+        inputStream.close();
     }
 }

+ 5 - 7
dfs-store/src/main/java/cn/reghao/dfs/store/service/GetObjectService.java

@@ -1,7 +1,7 @@
 package cn.reghao.dfs.store.service;
 
 import cn.reghao.dfs.store.db.mapper.FileMetaMapper;
-import cn.reghao.dfs.store.db.repository.FileMetaRepository;
+import cn.reghao.dfs.store.db.repository.ObjectRepository;
 import cn.reghao.dfs.store.model.dto.ContentRange;
 import cn.reghao.dfs.store.model.po.FileMeta;
 import cn.reghao.dfs.store.model.vo.ObjectMeta;
@@ -14,8 +14,6 @@ import javax.servlet.http.HttpServletResponse;
 import java.io.*;
 import java.net.URLEncoder;
 import java.nio.charset.StandardCharsets;
-import java.nio.file.Files;
-import java.nio.file.Path;
 
 /**
  * @author reghao
@@ -27,11 +25,11 @@ public class GetObjectService {
     private final FileMetaMapper fileMetaMapper;
     // 10MiB
     private final int bufSize = 1024*1024*10;
-    private final FileMetaRepository fileMetaRepository;
+    private final ObjectRepository objectRepository;
 
-    public GetObjectService(FileMetaMapper fileMetaMapper, FileMetaRepository fileMetaRepository) {
+    public GetObjectService(FileMetaMapper fileMetaMapper, ObjectRepository objectRepository) {
         this.fileMetaMapper = fileMetaMapper;
-        this.fileMetaRepository = fileMetaRepository;
+        this.objectRepository = objectRepository;
     }
     
     public void headObject(String objectName) throws IOException {
@@ -59,7 +57,7 @@ public class GetObjectService {
         String host = ServletUtil.getRequest().getHeader("host");
         HttpServletResponse response = ServletUtil.getResponse();
 
-        ObjectMeta objectMeta = fileMetaRepository.getObjectMeta(objectName);
+        ObjectMeta objectMeta = objectRepository.getObjectMeta(objectName);
         if (objectMeta == null) {
             response.setStatus(HttpServletResponse.SC_NOT_FOUND);
             OutputStream outputStream = response.getOutputStream();

+ 35 - 66
dfs-store/src/main/java/cn/reghao/dfs/store/service/ObjectMultipartUploadService.java

@@ -1,15 +1,17 @@
 package cn.reghao.dfs.store.service;
 
-import cn.reghao.dfs.store.db.mapper.DataBlockMapper;
 import cn.reghao.dfs.store.db.mapper.FileMetaMapper;
-import cn.reghao.dfs.store.model.dto.*;
+import cn.reghao.dfs.store.model.dto.PathUrl;
+import cn.reghao.dfs.store.util.ObjectUtil;
+import cn.reghao.oss.common.UploadFilePart;
+import cn.reghao.oss.common.UploadPrepare;
+import cn.reghao.oss.common.UploadPrepareRet;
+import cn.reghao.oss.common.UploadFileRet;
 import cn.reghao.dfs.store.model.po.*;
 import cn.reghao.jutil.jdk.security.DigestUtil;
-import cn.reghao.jutil.jdk.shell.Shell;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 import org.springframework.transaction.annotation.Transactional;
-import org.springframework.web.multipart.MultipartFile;
 
 import java.io.*;
 import java.util.*;
@@ -24,16 +26,16 @@ public class ObjectMultipartUploadService {
     // 20MiB
     private final static long PART_SIZE = 1024*1024*20;
     private final FileMetaMapper fileMetaMapper;
-    private final DataBlockMapper dataBlockMapper;
     private final FileStoreService fileStoreService;
-    private final Map<String, Set<Integer>> map = new HashMap<>();
+    private final Map<String, Set<Long>> map = new HashMap<>();
     private final Map<String, PathUrl> pathMap = new HashMap<>();
+    private final PutObjectService putObjectService;
 
-    public ObjectMultipartUploadService(FileMetaMapper fileMetaMapper, DataBlockMapper dataBlockMapper,
-                                        FileStoreService fileStoreService) {
+    public ObjectMultipartUploadService(FileMetaMapper fileMetaMapper, FileStoreService fileStoreService,
+                                        PutObjectService putObjectService) {
         this.fileMetaMapper = fileMetaMapper;
-        this.dataBlockMapper = dataBlockMapper;
         this.fileStoreService = fileStoreService;
+        this.putObjectService = putObjectService;
     }
 
     public synchronized UploadPrepareRet prepareUpload(UploadPrepare uploadPrepare) {
@@ -56,96 +58,63 @@ public class ObjectMultipartUploadService {
      * @date 2023-05-19 10:27:01
      */
     @Transactional(rollbackFor = Exception.class)
-    public synchronized UploadFileRet putFilePart(UploadFilePart uploadFilePart) throws Exception {
-        String partSha256sum = uploadFilePart.getPartSha256sum();
+    public synchronized UploadFileRet putFilePart(InputStream inputStream, UploadFilePart uploadFilePart) throws Exception {
+        String filename = uploadFilePart.getFilename();
         long totalSize = uploadFilePart.getTotalSize();
         long chunkSize = uploadFilePart.getChunkSize();
         int currentPartSize = uploadFilePart.getCurrentChunkSize();
-        int totalParts = uploadFilePart.getTotalChunks();
-        int chunkNumber = uploadFilePart.getChunkNumber();
-        MultipartFile multipartFile = uploadFilePart.getFile();
+        long totalParts = uploadFilePart.getTotalChunks();
+        long chunkNumber = uploadFilePart.getChunkNumber();
         log.info("{} -> {}:{}", currentPartSize, totalParts, chunkNumber);
 
         String sha256sum = uploadFilePart.getIdentifier();
-        Set<Integer> set = map.computeIfAbsent(sha256sum, k -> new HashSet<>());
+        FileMeta fileMeta = fileMetaMapper.findBySha256sum(sha256sum);
+        if (fileMeta != null) {
+            String objectName = ObjectUtil.getObjectName(uploadFilePart.getChannelId());
+            putObjectService.copyObject(objectName, filename, fileMeta);
+            String url = String.format("https://file.reghao.cn/%s", objectName);
+            return new UploadFileRet(sha256sum, url);
+        }
+
+        Set<Long> set = map.computeIfAbsent(sha256sum, k -> new HashSet<>());
         if (set.isEmpty()) {
-            String absolutePath = fileStoreService.genFilePath(sha256sum, totalSize);
+            String contentId = UUID.randomUUID().toString().replace("-", "");
+            String absolutePath = fileStoreService.genFilePath(contentId, totalSize);
             fileStoreService.createSparseFile(absolutePath, totalSize);
 
-            PathUrl pathUrl = new PathUrl(sha256sum, absolutePath);
+            PathUrl pathUrl = new PathUrl(contentId, absolutePath);
             pathMap.put(sha256sum, pathUrl);
         }
 
         if (set.add(chunkNumber)) {
             long pos = (chunkNumber-1) * chunkSize;
             String absolutePath = pathMap.get(sha256sum).getAbsolutePath();
-            fileStoreService.writeToFile(multipartFile.getInputStream(), absolutePath, pos);
+            fileStoreService.writeToFile(inputStream, absolutePath, pos);
         }
 
         if (set.size() != totalParts) {
             return new UploadFileRet(sha256sum);
         } else {
+            String contentId = pathMap.get(sha256sum).getContentId();
             String absolutePath = pathMap.get(sha256sum).getAbsolutePath();
             FileInputStream fis = new FileInputStream(absolutePath);
             String sha256sumMerged = DigestUtil.sha256sum(fis);
             if (!sha256sum.equals(sha256sumMerged)) {
                 throw new Exception("分片合并文件的 sha256sum 与原文件不一致!");
+            } else {
+                log.info("合并的文件 {}", absolutePath);
             }
 
-            String objectName1;
             int channelId = uploadFilePart.getChannelId();
             String pid = uploadFilePart.getPid();
-            if (pid.equals("0")) {
-                objectName1 = UUID.randomUUID().toString().replace("-", "");
-            } else {
-                FileMeta fileMeta = fileMetaMapper.findByObjectId(pid);
-                String objectName = fileMeta.getObjectName();
-                objectName1 = objectName + UUID.randomUUID().toString().replace("-", "");
-            }
+            String objectName = ObjectUtil.getObjectName(channelId);
+            File savedFile = new File(absolutePath);
+            putObjectService.putObject(pid, objectName, contentId, savedFile, filename, absolutePath);
 
-            String filename = uploadFilePart.getFilename();
-            saveFile(objectName1, filename, totalSize, sha256sumMerged, pid, absolutePath);
-            log.info("合并的文件 {}: {}", objectName1, absolutePath);
             map.remove(sha256sum);
             pathMap.remove(sha256sum);
-            String url = String.format("https://file.reghao.cn/%s", pid);
+            String url = String.format("https://file.reghao.cn/%s", objectName);
             return new UploadFileRet(sha256sum, url);
         }
     }
-
-    private void saveFile(String objectName, String filename, long totalSize, String sha256sumMerged,
-                          String pid, String absolutePath) {
-        String objectId = UUID.randomUUID().toString().replace("-", "");
-        String contentType = getMediaType(absolutePath);
-        int fileType = getFileType(contentType);
-
-        FileMeta fileMeta = new FileMeta(objectName, objectId, filename, totalSize, fileType, contentType, sha256sumMerged, pid);
-        List<DataBlock> blocks = new ArrayList<>();
-        String blockId = UUID.randomUUID().toString();
-        blocks.add(new DataBlock(sha256sumMerged, 0, blockId, absolutePath));
-
-        fileMetaMapper.save(fileMeta);
-        dataBlockMapper.saveAll(blocks);
-    }
-
-    private String getMediaType(String src) {
-        String cmd = String.format("/bin/file -b --mime-type \"%s\"", src);
-        return Shell.execWithResult(cmd);
-    }
-
-    private Integer getFileType(String contentType) {
-        int fileType = 1005;
-        if (contentType == null) {
-            return fileType;
-        } else if (contentType.startsWith("image")) {
-            fileType = 1001;
-        } else if (contentType.startsWith("video")) {
-            fileType = 1002;
-        } else if (contentType.startsWith("audio")) {
-            fileType = 1003;
-        } else if (contentType.startsWith("text")) {
-            fileType = 1004;
-        }
-        return fileType;
-    }
 }

+ 43 - 66
dfs-store/src/main/java/cn/reghao/dfs/store/service/PutObjectService.java

@@ -1,21 +1,15 @@
 package cn.reghao.dfs.store.service;
 
-import cn.reghao.dfs.store.db.mapper.DataBlockMapper;
-import cn.reghao.dfs.store.db.mapper.FileMetaMapper;
+import cn.reghao.dfs.store.db.repository.ObjectRepository;
 import cn.reghao.dfs.store.model.po.DataBlock;
 import cn.reghao.dfs.store.model.po.FileMeta;
 import cn.reghao.jutil.jdk.security.DigestUtil;
 import cn.reghao.jutil.jdk.shell.Shell;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
 import org.springframework.stereotype.Service;
-import org.springframework.transaction.annotation.Transactional;
 
 import java.io.*;
-import java.nio.file.CopyOption;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.StandardCopyOption;
-import java.security.NoSuchAlgorithmException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
@@ -27,81 +21,69 @@ import java.util.UUID;
 @Slf4j
 @Service
 public class PutObjectService {
-    private final FileMetaMapper fileMetaMapper;
-    private final DataBlockMapper dataBlockMapper;
+    private final ObjectRepository objectRepository;
     private final FileStoreService fileStoreService;
 
-    public PutObjectService(FileMetaMapper fileMetaMapper, DataBlockMapper dataBlockMapper,
-                            FileStoreService fileStoreService) {
-        this.fileMetaMapper = fileMetaMapper;
-        this.dataBlockMapper = dataBlockMapper;
+    public PutObjectService(ObjectRepository objectRepository, FileStoreService fileStoreService) {
+        this.objectRepository = objectRepository;
         this.fileStoreService = fileStoreService;
     }
 
-    @Transactional(rollbackFor = Exception.class)
-    public void putObject(String objectName, File file, String sha256sum) throws Exception {
-        String[] names = objectName.split("/");
-        String filename = names[names.length-1];
-
-        String contentType = getMediaType(file.getAbsolutePath());
-        FileMeta fileMeta2 = fileMetaMapper.findByObjectName(objectName);
-        if (fileMeta2 != null) {
-            String sha256sum1 = fileMeta2.getSha256sum();
-            if (sha256sum1.equals(sha256sum)) {
-                log.info("{} 内容没有变化", objectName);
-                return;
-            }
+    public void putObject(String pid, String objectName, String contentId, File savedFile, String originalFilename, String sha256sum) {
+        FileMeta fileMeta = objectRepository.getBySha256sum(sha256sum);
+        if (fileMeta != null) {
+            copyObject(objectName, originalFilename, fileMeta);
+            FileUtils.deleteQuietly(savedFile);
+        } else {
+            String savedPath = savedFile.getAbsolutePath();
+            long size = savedFile.length();
+            String objectId = UUID.randomUUID().toString().replace("-", "");
+            String contentType = getMediaType(savedPath);
+            int fileType = getFileType(contentType);
 
-            log.info("更新对象操作暂未实现");
-            return;
+            fileMeta = new FileMeta(objectName, objectId, contentId, originalFilename, size, fileType, contentType, sha256sum, pid);
+            String blockId = UUID.randomUUID().toString();
+            List<DataBlock> list = List.of(new DataBlock(contentId, 0, blockId, savedPath));
+            objectRepository.saveObject(fileMeta, list);
         }
+    }
 
-        //Files.move(Path.of(file.getAbsolutePath()), Path.of(""), StandardCopyOption.REPLACE_EXISTING);
+    public String getPid(String objectName) {
         addParent(objectName);
         int idx = objectName.lastIndexOf("/");
         String objectName1 = objectName.substring(0, idx+1);
-        FileMeta fileMeta1 = fileMetaMapper.findByObjectName(objectName1);
-        String pid = fileMeta1.getObjectId();
-
-        FileMeta fileMeta = fileMetaMapper.findBySha256sum(sha256sum);
-        if (fileMeta == null) {
-            String objectId = UUID.randomUUID().toString().replace("-", "");
-            long size = file.length();
-            FileInputStream fis = new FileInputStream(file);
-            List<DataBlock> blocks = store(sha256sum, size, fis);
-            int fileType = getFileType(contentType);
-            fileMeta = new FileMeta(objectName, objectId, filename, size, fileType, contentType, sha256sum, pid);
-
-            fileMetaMapper.save(fileMeta);
-            dataBlockMapper.saveAll(blocks);
-        } else {
-            String objectId = UUID.randomUUID().toString().replace("-", "");
-            FileMeta fileMeta3 = new FileMeta(objectName, objectId, filename, fileMeta);
-            fileMetaMapper.save(fileMeta3);
-        }
+        FileMeta fileMeta1 = objectRepository.getByObjectName(objectName1);
+        return fileMeta1.getObjectId();
     }
 
     public void putObject(String objectName, byte[] bytes) {
         try {
             String sha256sum = DigestUtil.sha256sum(bytes);
+            FileMeta fileMeta = objectRepository.getBySha256sum(sha256sum);
+            if (fileMeta != null) {
+                copyObject(objectName, objectName, fileMeta);
+            } else {
+                String pid = "";
+                String contentId = "";
+                File savedFile = fileStoreService.saveFile(bytes, contentId);
+                String filename = "";
+            }
         } catch (Exception e) {
-
+            e.printStackTrace();
         }
     }
 
-    public void postObject(String pid, String absolutePath) {
-    }
-
-    private String getMediaType(String src) {
-        String cmd = String.format("/bin/file -b --mime-type \"%s\"", src);
-        return Shell.execWithResult(cmd);
+    public void copyObject(String objectName, String filename, FileMeta fileMeta) {
+        String objectId = UUID.randomUUID().toString().replace("-", "");
+        FileMeta fileMeta1 = new FileMeta(objectName, objectId, filename, fileMeta);
+        objectRepository.saveFileMeta(fileMeta1);
     }
 
     private void addParent(String objectName) {
         List<String> list = getParent(objectName);
         List<FileMeta> fileMetas = new ArrayList<>();
         list.forEach(parentName -> {
-            FileMeta fileMeta = fileMetaMapper.findByObjectName(parentName);
+            FileMeta fileMeta = objectRepository.getByObjectName(parentName);
             if (fileMeta == null) {
                 String pid = "0";
                 int index = parentName.lastIndexOf("/");
@@ -112,7 +94,7 @@ public class PutObjectService {
         });
 
         if (!fileMetas.isEmpty()) {
-            fileMetaMapper.saveAll(fileMetas);
+            objectRepository.saveFileMetas(fileMetas);
         }
     }
 
@@ -126,14 +108,9 @@ public class PutObjectService {
         return list;
     }
 
-    private List<DataBlock> store(String sha256sum, long size, InputStream inputStream) throws IOException {
-        String absolutePath = fileStoreService.genFilePath(sha256sum, size);
-        fileStoreService.saveFile(absolutePath, inputStream);
-
-        List<DataBlock> list = new ArrayList<>();
-        String blockId = UUID.randomUUID().toString();
-        list.add(new DataBlock(sha256sum, 0, blockId, absolutePath));
-        return list;
+    private String getMediaType(String src) {
+        String cmd = String.format("/bin/file -b --mime-type \"%s\"", src);
+        return Shell.execWithResult(cmd);
     }
 
     private Integer getFileType(String contentType) {

+ 30 - 0
dfs-store/src/main/java/cn/reghao/dfs/store/util/ObjectUtil.java

@@ -0,0 +1,30 @@
+package cn.reghao.dfs.store.util;
+
+import cn.reghao.dfs.store.model.constant.UploadChannel;
+import cn.reghao.jutil.web.ServletUtil;
+
+import javax.servlet.http.HttpServletRequest;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+import java.util.UUID;
+
+/**
+ * @author reghao
+ * @date 2023-05-25 09:34:49
+ */
+public class ObjectUtil {
+    public static String getObjectName() {
+        HttpServletRequest servletRequest = ServletUtil.getRequest();
+        String uri = servletRequest.getRequestURI();
+        String uri1 = URLDecoder.decode(uri, StandardCharsets.UTF_8);
+        return uri1.replaceFirst("/", "");
+    }
+
+    public static String getObjectName(int channelId) throws Exception {
+        String prefix = UploadChannel.getPrefix(channelId);
+        if (prefix == null) {
+            throw new Exception("channelId 不合法");
+        }
+        return prefix + UUID.randomUUID().toString().replace("-", "");
+    }
+}

+ 0 - 40
dfs-store/src/main/java/cn/reghao/dfs/store/util/store/LoadBalancer.java

@@ -1,14 +1,7 @@
 package cn.reghao.dfs.store.util.store;
 
-import cn.reghao.jutil.jdk.security.DigestUtil;
-import com.google.common.hash.HashFunction;
-import com.google.common.hash.Hashing;
 import org.springframework.stereotype.Component;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.security.NoSuchAlgorithmException;
 import java.util.*;
 
 /**
@@ -17,43 +10,10 @@ import java.util.*;
  */
 @Component
 public class LoadBalancer {
-    private final HashFunction hashFunction = Hashing.murmur3_32_fixed();
-
     public StoreDir getStoreDir(long fileSize) {
         LocalStore localStore = LocalStores.getMaxStore(fileSize);
         List<StoreDir> subDirs = LocalStores.getSubDirs(localStore.getMountedOn());
         subDirs.sort(Comparator.comparingInt(StoreDir::getCount));
         return subDirs.get(0);
     }
-
-    public StoreDir getStoreDir(long fileSize, String sha256sum) throws IOException, NoSuchAlgorithmException {
-        LocalStore localStore = LocalStores.getMaxStore(fileSize);
-        List<StoreDir> subDirs = LocalStores.getSubDirs(localStore.getMountedOn());
-
-        int hash = hash(sha256sum);
-        int nodeIdx = hash % subDirs.size();
-        return null;
-    }
-
-    private int hash(Object object) throws IOException, NoSuchAlgorithmException {
-        int ret = 0;
-        if (object instanceof Integer) {
-            Integer i = (Integer) object;
-            ret = hashFunction.hashInt(i).asInt();
-        } else if (object instanceof String) {
-            String str = (String) object;
-            ret = hashFunction.hashUnencodedChars(str).asInt();
-        } else if (object instanceof File) {
-            File file = (File) object;
-            FileInputStream fis = new FileInputStream(file);
-            String sha256sum = DigestUtil.sha256sum(fis);
-            ret = hashFunction.hashUnencodedChars(sha256sum).asInt();
-        }
-        return Math.abs(ret);
-    }
-
-    // TODO 实现一致性 hash 以处理添加新磁盘时出现的问题
-    private int consistentHash() {
-        return 0;
-    }
 }

+ 7 - 7
dfs-store/src/main/resources/application-dev.yml

@@ -1,17 +1,17 @@
 dubbo:
   registry:
-    address: zookeeper://127.0.0.1:2181
+    address: zookeeper://192.168.0.110:2181
 spring:
   redis:
     database: 0
-    host: 127.0.0.1
+    host: 192.168.0.110
     port: 6379
-    password: Dev@123456
+    password: Test@123456
   datasource:
-    url: jdbc:mysql://localhost:3306/reghao_oss_rdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
-    username: dev
-    password: Dev@123456
+    url: jdbc:mysql://192.168.0.110:3306/reghao_oss_tdb?useSSL=false&useUnicode=true&characterEncoding=UTF-8&serverTimezone=GMT%2b8
+    username: test
+    password: Test@123456
 oss:
   domain: oss.reghao.cn
   mountedDirs:
-    - /opt/oss/disk/00b989fc-991b-4d4e-959e-9b6e19299b72/
+    - /opt/oss/disk/13f654c8-af87-4710-aac9-7aa086c99aec/

+ 10 - 0
dfs-store/src/main/resources/mapper/FileMetaMapper.xml

@@ -22,6 +22,16 @@
         set object_name=#{objectName}
         where object_id=#{objectId}
     </update>
+    <update id="updateBatch">
+        update data_block
+        <trim prefix="set" suffixOverrides=",">
+            <trim prefix="content_id =case" suffix="end,">
+                <foreach collection="list" item="item" index="index">
+                    when object_id=#{item.objectId} then #{item.contentId}
+                </foreach>
+            </trim>
+        </trim>
+    </update>
 
     <select id="count" resultType="java.lang.Integer">
         select count(*) from file_meta

+ 27 - 1
dfs-store/src/test/java/FileMetaTest.java

@@ -21,6 +21,7 @@ import java.nio.file.FileVisitor;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
 
@@ -66,13 +67,38 @@ public class FileMetaTest {
         int pageNumber = 1;
         Page page = new Page(pageNumber, pageSize);
         List<DataBlock> dataBlocks = dataBlockMapper.findDataBlockByPage(page);
+        List<FileMeta> fileMetas = new ArrayList<>();
         while (!dataBlocks.isEmpty()) {
             dataBlocks.forEach(dataBlock -> {
                 String objectId = dataBlock.getObjectId();
                 String contentId = UUID.randomUUID().toString().replace("-", "");
-                dataBlockMapper.updateContentId(objectId, contentId);
+
+                FileMeta fileMeta = fileMetaMapper.findByObjectId(objectId);
+                if (fileMeta == null) {
+                    log.error("{} 不存在", objectId);
+                    return;
+                }
+
+                //dataBlockMapper.updateContentId(objectId, contentId);
+                fileMeta.setContentId(contentId);
+                fileMetas.add(fileMeta);
+                dataBlock.setContentId(contentId);
             });
 
+
+            long start = System.currentTimeMillis();
+            if (!fileMetas.isEmpty()) {
+                fileMetaMapper.updateBatch(fileMetas);
+                fileMetas.clear();
+                log.info("batch update FileMeta cost {}", System.currentTimeMillis()-start);
+            }
+
+            if (!dataBlocks.isEmpty()) {
+                dataBlockMapper.updateBatch(dataBlocks);
+                log.info("batch update DataBlock cost {}", System.currentTimeMillis()-start);
+            }
+
+
             pageNumber++;
             page = new Page(pageNumber, pageSize);
             dataBlocks = dataBlockMapper.findDataBlockByPage(page);