Bläddra i källkod

dfs-store 中添加一个缓存接口

reghao 2 år sedan
förälder
incheckning
32e5896e83

+ 5 - 0
dfs-store/pom.xml

@@ -46,6 +46,11 @@
             <artifactId>dfs-api</artifactId>
             <version>1.0.0-SNAPSHOT</version>
         </dependency>
+        <dependency>
+            <groupId>cn.reghao.oss</groupId>
+            <artifactId>oss-common</artifactId>
+            <version>1.0.0-SNAPSHOT</version>
+        </dependency>
 
         <dependency>
             <groupId>org.projectlombok</groupId>

+ 40 - 0
dfs-store/src/main/java/cn/reghao/dfs/store/cache/CacheTask.java

@@ -0,0 +1,40 @@
+package cn.reghao.dfs.store.cache;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2023-04-25 18:39:04
+ */
+public class CacheTask implements Runnable {
+    private Map<String, String> cacheMap;
+    private final String cacheDir;
+    private final String objectId;
+    private final String absolutePath;
+
+    public CacheTask(Map<String, String> cacheMap, String cacheDir, String objectId, String absolutePath) {
+        this.cacheMap = cacheMap;
+        this.cacheDir = cacheDir;
+        this.objectId = objectId;
+        this.absolutePath = absolutePath;
+    }
+
+    @Override
+    public void run() {
+        String cachePath = String.format("%s/%s", cacheDir, objectId);
+        try {
+            Files.copy(Path.of(absolutePath), Path.of(cachePath));
+            cacheMap.put(objectId, cachePath);
+        } catch (IOException e) {
+            e.printStackTrace();
+            try {
+                Files.delete(Path.of(cachePath));
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+    }
+}

+ 49 - 0
dfs-store/src/main/java/cn/reghao/dfs/store/cache/LocalCache.java

@@ -0,0 +1,49 @@
+package cn.reghao.dfs.store.cache;
+
+import cn.reghao.dfs.store.config.DfsProperties;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author reghao
+ * @date 2023-04-25 18:24:17
+ */
+@Slf4j
+@Service
+public class LocalCache {
+    private final String cacheDir;
+    private final Map<String, String> cacheMap;
+    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("cache-poll", 10);
+
+    public LocalCache(DfsProperties dfsProperties) {
+        this.cacheDir = dfsProperties.getCacheDir();
+        this.cacheMap = new HashMap<>();
+        init(cacheDir);
+    }
+
+    private void init(String cacheDir) {
+        File dir = new File(cacheDir);
+        Arrays.stream(dir.listFiles()).forEach(file -> {
+            String objectId = file.getName();
+            String absolutePath = file.getAbsolutePath();
+            cacheMap.put(objectId, absolutePath);
+        });
+    }
+
+    public String getCache(String objectId) {
+        return cacheMap.get(objectId);
+    }
+
+    public void putCache(String objectId, String absolutePath) {
+        CacheTask task = new CacheTask(cacheMap, cacheDir, objectId, absolutePath);
+        log.info("添加缓存任务 {}", absolutePath);
+        threadPool.submit(task);
+    }
+}

+ 22 - 12
dfs-store/src/main/java/cn/reghao/dfs/store/controller/ObjectBasicController.java

@@ -5,12 +5,10 @@ import cn.reghao.dfs.store.model.dto.DeleteObjects;
 import cn.reghao.dfs.store.model.vo.PostResponse;
 import cn.reghao.dfs.store.redis.ds.RedisString;
 import cn.reghao.dfs.store.service.ObjectBasicService;
-import cn.reghao.dfs.store.util.OssUtil;
-import cn.reghao.jutil.jdk.converter.ByteHex;
 import cn.reghao.jutil.jdk.result.WebResult;
-import cn.reghao.jutil.jdk.security.Base64Util;
 import cn.reghao.jutil.jdk.security.DigestUtil;
 import cn.reghao.jutil.web.ServletUtil;
+import cn.reghao.oss.common.OssUtil;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
@@ -63,16 +61,16 @@ public class ObjectBasicController {
             return ResponseEntity.status(500).body("md5 不匹配");
         }
 
-        String accessKey = OssUtil.getAccessKey();
-        if (accessKey == null) {
+        String accessKeyId = OssUtil.getAccessKey();
+        if (accessKeyId == null) {
             return ResponseEntity.status(500).body("签名不正确1");
         } else {
-            String secretKey = redisString.get(accessKey);
-            if (secretKey == null) {
+            String secretAccessKey = redisString.get(accessKeyId);
+            if (secretAccessKey == null) {
                 return ResponseEntity.status(500).body("签名不正确2");
             }
 
-            boolean matched = OssUtil.matchSignature(secretKey, sha256sum1);
+            boolean matched = OssUtil.matchSignature(secretAccessKey, sha256sum1);
             if (!matched) {
                 return ResponseEntity.status(500).body("签名不正确3");
             }
@@ -97,13 +95,27 @@ public class ObjectBasicController {
     @ApiOperation("使用 formdata 上传对象")
     @PostMapping(value = "/")
     public String postObject(@Validated PostObject postObject) throws Exception {
+        String credential = postObject.getXAmzCredential();
+        String signature = postObject.getXAmzSignature();
+        String policy = postObject.getPolicy();
+
+        String str[] = credential.split("/");
+        String accessKeyId = str[0];
+        String dateStr = str[1];
+        String regionName = str[2];
+        String serviceName = str[3];
+
+        String secretAccessKey = redisString.get(accessKeyId);
+        byte[] signingKey = OssUtil.getSigningKey(secretAccessKey, dateStr, regionName, serviceName);
+
         String objectName = postObject.getKey();
-        //String contentType = postObject.getContentType();
+        String contentType = postObject.getContentType();
+
         MultipartFile file = postObject.getFile();
 
         long len = file.getSize();
         InputStream inputStream = file.getInputStream();
-        String contentType = file.getContentType();
+        //String contentType = file.getContentType();
 
         objectBasicService.postObject(objectName, len, contentType, inputStream);
         PostResponse postResponse = new PostResponse();
@@ -134,7 +146,6 @@ public class ObjectBasicController {
     @ApiOperation(value = "删除对象")
     @DeleteMapping(value = "/{objectName}")
     public String deleteObject(@PathVariable("objectName") String objectName) {
-        objectBasicService.deleteObject(objectName);
         return WebResult.success();
     }
 
@@ -147,7 +158,6 @@ public class ObjectBasicController {
     @ApiOperation("创建已存在对象的副本")
     @PutMapping(value = "/{destinationObject}", params = {"destinationObject"})
     public String putObjectCopy(@PathVariable("destinationObject") String destinationObject) {
-        objectBasicService.putObjectCopy();
         return WebResult.success();
     }
 }

+ 27 - 14
dfs-store/src/main/java/cn/reghao/dfs/store/service/ObjectBasicService.java

@@ -1,5 +1,6 @@
 package cn.reghao.dfs.store.service;
 
+import cn.reghao.dfs.store.cache.LocalCache;
 import cn.reghao.dfs.store.db.mapper.DataBlockMapper;
 import cn.reghao.dfs.store.db.mapper.FileMetaMapper;
 import cn.reghao.dfs.store.model.po.ContentRange;
@@ -38,13 +39,14 @@ public class ObjectBasicService {
     private final int blockSize = 1024*1024*20;
     // 10MiB
     private final long partLength = 1024*1024*10;
-    private FileStoreService fileStoreService;
-    private FileUrlService fileUrlService;
-    private FileTypeService fileTypeService;
+    private final FileStoreService fileStoreService;
+    private final FileUrlService fileUrlService;
+    private final FileTypeService fileTypeService;
+    private final LocalCache localCache;
 
     public ObjectBasicService(FileMetaMapper fileMetaMapper, DataBlockMapper dataBlockMapper,
                               FileStoreService fileStoreService, FileUrlService fileUrlService,
-                              FileTypeService fileTypeService) {
+                              FileTypeService fileTypeService, LocalCache localCache) {
         this.objectIdGenerator = new IdGenerator(32, "object-id");
         this.blockIdGenerator = new IdGenerator(32, "block-id");
         this.fileMetaMapper = fileMetaMapper;
@@ -52,16 +54,17 @@ public class ObjectBasicService {
         this.fileStoreService = fileStoreService;
         this.fileUrlService = fileUrlService;
         this.fileTypeService = fileTypeService;
+        this.localCache = localCache;
     }
 
     public void putObject(String objectName, File file, String contentType, String sha256sum) throws Exception {
         String[] names = objectName.split("/");
         String filename = names[names.length-1];
 
-        FileMeta fileMeta2 = fileMetaMapper.findByObjectName(objectName);
-        log.info("暂未实现 PUT 存储");
+        /*FileMeta fileMeta2 = fileMetaMapper.findByObjectName(objectName);
+        log.info("暂未实现 PUT 存储");*/
 
-        /*FileMeta fileMeta = fileMetaMapper.findBySha256sum(sha256sum);
+        FileMeta fileMeta = fileMetaMapper.findBySha256sum(sha256sum);
         if (fileMeta == null) {
             String objectId = objectIdGenerator.stringId();
             long len = file.length();
@@ -74,7 +77,7 @@ public class ObjectBasicService {
         } else {
             FileMeta fileMeta1 = new FileMeta(objectName, filename, fileMeta);
             fileMetaMapper.save(fileMeta1);
-        }*/
+        }
     }
 
     public void postObject(String objectName, long len, String contentType, InputStream inputStream) throws Exception {
@@ -135,11 +138,7 @@ public class ObjectBasicService {
             OutputStream outputStream = response.getOutputStream();
             outputStream.flush();
             outputStream.close();
-            return;
         }
-
-        String objectId = fileMeta.getObjectId();
-
     }
 
     public void getObject(String objectName) throws IOException {
@@ -234,11 +233,25 @@ public class ObjectBasicService {
         response.setHeader("Content-Range", "bytes "+start+"-"+(len-1)+"/"+len);
 
         OutputStream outputStream = response.getOutputStream();
-        List<DataBlock> list = dataBlockMapper.findByObjectId(objectId);
+        RandomAccessFile raf;
+        String cachePath = localCache.getCache(objectId);
+        if (cachePath != null) {
+            raf = new RandomAccessFile(cachePath, "r");
+        } else {
+            List<DataBlock> list = dataBlockMapper.findByObjectId(objectId);
+            DataBlock dataBlock = list.get(0);
+            String blockId = dataBlock.getBlockId();
+            String absolutePath = dataBlock.getAbsolutePath();
+            localCache.putCache(objectId, absolutePath);
+            raf = new RandomAccessFile(absolutePath, "r");
+        }
+
+        /*List<DataBlock> list = dataBlockMapper.findByObjectId(objectId);
         DataBlock dataBlock = list.get(0);
         String blockId = dataBlock.getBlockId();
         String absolutePath = dataBlock.getAbsolutePath();
-        RandomAccessFile raf = new RandomAccessFile(absolutePath, "r");
+
+        RandomAccessFile raf = new RandomAccessFile(absolutePath, "r");*/
         raf.seek(start);
 
         // 10MiB