Browse Source

update oss-sdk

reghao 1 year ago
parent
commit
9520cd34b4

+ 85 - 34
oss-sdk/src/main/java/cn/reghao/oss/sdk/ObjectMultipartUploadService.java

@@ -3,7 +3,7 @@ package cn.reghao.oss.sdk;
 import cn.reghao.jutil.jdk.http.UploadParam;
 import cn.reghao.jutil.jdk.http.WebRequest;
 import cn.reghao.jutil.jdk.http.WebResponse;
-import cn.reghao.jutil.jdk.io.FilePart;
+import cn.reghao.jutil.jdk.io.FileSplitter;
 import cn.reghao.jutil.jdk.result.WebResult;
 import cn.reghao.jutil.jdk.security.DigestUtil;
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
@@ -22,7 +22,9 @@ import java.net.URISyntaxException;
 import java.net.http.HttpClient;
 import java.net.http.HttpRequest;
 import java.net.http.HttpResponse;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -32,25 +34,31 @@ import java.util.Map;
 @Slf4j
 public class ObjectMultipartUploadService {
     private final String endpoint;
-    private final FilePart filePart = new FilePart();
+    private final String token;
+    // 10MB
+    private final FileSplitter fileSplitter = new FileSplitter();
     private final HttpClient httpClient = HttpClient.newBuilder().build();
     private final WebRequest webRequest = new DefaultWebRequest();
 
-    public ObjectMultipartUploadService(String endpoint) {
+    public ObjectMultipartUploadService(String endpoint, String token) {
         this.endpoint = endpoint;
+        this.token = token;
     }
 
-    public void create() throws URISyntaxException, IOException, InterruptedException {
+    public UploadPrepareRet create(File file) throws Exception {
+        String filename = file.getName();
+        long len = file.length();
+        String sha256sum = DigestUtil.sha256sum(file.getAbsolutePath());
+
         MultiPartBodyPublisher publisher = new MultiPartBodyPublisher();
-        publisher.addPart("filename", "1.zip")
-                .addPart("size", 1+"")
-                .addPart("sha256sum", "1234567890");
+        publisher.addPart("filename", filename)
+                .addPart("size", ""+len)
+                .addPart("sha256sum", sha256sum);
 
-        String authorization = "";
         String api = endpoint + "/?create";
         HttpRequest httpRequest = HttpRequest.newBuilder(new URI(api))
                 .version(HttpClient.Version.HTTP_1_1)
-                .header("Authorization", authorization)
+                .header("Authorization", "Bearer " + token)
                 .header("Content-Type", "multipart/form-data; boundary=" + publisher.getBoundary())
                 .POST(publisher.build())
                 .build();
@@ -61,20 +69,26 @@ public class ObjectMultipartUploadService {
         WebResult<UploadPrepareRet> webResult = JsonConverter.jsonToObject(body, type);
         if (webResult.getCode() != 0) {
             String errMsg = webResult.getMsg();
+            return null;
         }
 
-        UploadPrepareRet uploadPrepareRet = webResult.getData();
-        System.out.println();
+        return webResult.getData();
     }
 
-    public void get() throws URISyntaxException, IOException, InterruptedException {
-        String authorization = "";
-        String api = endpoint + "/?multipart";
+    public UploadedPart get(File file) throws Exception {
+        //String sha256sum = DigestUtil.sha256sum(file.getAbsolutePath());
+        String sha256sum = "";
+        MultiPartBodyPublisher publisher = new MultiPartBodyPublisher();
+        publisher
+                .addPart("sha256sum", sha256sum);
+
+        String api = endpoint + "/?get_multipart";
         HttpRequest httpRequest = HttpRequest.newBuilder(new URI(api))
                 .version(HttpClient.Version.HTTP_1_1)
-                .header("Authorization", authorization)
-                //.header("Content-Type", "multipart/form-data; boundary=" + publisher.getBoundary())
-                .GET()
+                .header("Authorization", token)
+                .header("Content-Type", "multipart/form-data; boundary=" + publisher.getBoundary())
+                .POST(publisher.build())
+                //.GET()
                 .build();
 
         HttpResponse<String> httpResponse = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
@@ -83,9 +97,10 @@ public class ObjectMultipartUploadService {
         WebResult<UploadedPart> webResult = JsonConverter.jsonToObject(body, type);
         if (webResult.getCode() != 0) {
             String errMsg = webResult.getMsg();
+            return null;
         }
 
-        UploadedPart uploadedPart = webResult.getData();
+        return webResult.getData();
     }
 
     /**
@@ -96,7 +111,7 @@ public class ObjectMultipartUploadService {
      * @return
      * @date 2023-05-24 14:56:50
      */
-    public void postObject1(byte[] bytes, UploadFilePart uploadFilePart)
+    private void postObject1(byte[] bytes, UploadFilePart uploadFilePart)
             throws URISyntaxException, IOException, InterruptedException {
         MultiPartBodyPublisher publisher = new MultiPartBodyPublisher();
         ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
@@ -125,7 +140,7 @@ public class ObjectMultipartUploadService {
         System.out.println();
     }
 
-    public void postObject(byte[] bytes, UploadFilePart uploadFilePart) {
+    private UploadFileRet postObject(byte[] bytes, UploadFilePart uploadFilePart) {
         UploadParam uploadParam = new UploadParam(bytes, "");
         Map<String, String> params = new HashMap<>();
         params.put("channelId", uploadFilePart.getChannelId()+"");
@@ -140,10 +155,11 @@ public class ObjectMultipartUploadService {
         uploadParam.setTextParams(params);
 
         String api = endpoint + "/?multipart";
-        WebResponse webResponse = webRequest.upload(api, uploadParam, null);
+        WebResponse webResponse = webRequest.upload(api, uploadParam, token);
         int statusCode = webResponse.getStatusCode();
         if (statusCode != 200) {
             log.error("请求失败");
+            return null;
         }
 
         String body = webResponse.getBody();
@@ -151,37 +167,72 @@ public class ObjectMultipartUploadService {
         WebResult<UploadFileRet> webResult = JsonConverter.jsonToObject(body, type);
         if (webResult.getCode() != 0) {
             String errMsg = webResult.getMsg();
+            log.error("请求失败 -> {}", errMsg);
+            return null;
         }
 
-        UploadFileRet uploadFileRet = webResult.getData();
-        if (!uploadFileRet.isMerged()) {
-            log.info("继续上传");
-        } else {
-            log.info("分片上传并合并完成");
-        }
+        return webResult.getData();
     }
 
-    public void upload(File file, int channelId) throws Exception {
+    public UploadFileRet upload(File file, int channelId) throws Exception {
         String identifier = DigestUtil.sha256sum(new FileInputStream(file));
         String filename = file.getName();
         String relativePath = file.getAbsolutePath();
 
         long totalSize = file.length();
-        // 分段大小在 5MB - 5GB 之间,只有最后一个分段才允许小于 5MB,不可避免的
-        int chunkSize = filePart.getPartSize();
+        // 分片大小在 10MB - 100MB 之间,只有最后一个分片才允许小于 10MB(这无法避免)
+        long chunkSize = fileSplitter.getPartSize();
         long totalChunks = totalSize/chunkSize;
         if (totalSize % chunkSize != 0) {
             totalChunks += 1;
         }
 
-        for (long i = 0; i < totalChunks; i++) {
+        Map<Integer, Long> map = new HashMap<>();
+        List<Integer> failedChunkNumbers = new ArrayList<>();
+        for (int i = 0; i < totalChunks; i++) {
             long start = i*chunkSize;
-            byte[] part = filePart.getPart(file.getAbsolutePath(), chunkSize, start);
-            long chunkNumber = i + 1;
+            // 从 start 位置开始读取 chunkSize 的数据
+            byte[] part = fileSplitter.getPart(file.getAbsolutePath(), start);
+            int chunkNumber = i + 1;
+            map.put(chunkNumber, start);
+
             int currentChunkSize = part.length;
             UploadFilePart uploadFilePart = new UploadFilePart(channelId, identifier, filename, relativePath,
                     totalSize, chunkSize, totalChunks, chunkNumber, currentChunkSize);
-            postObject(part, uploadFilePart);
+
+            UploadFileRet uploadFileRet = postObject(part, uploadFilePart);
+            if (uploadFileRet == null) {
+                log.info("{}:{} upload failed", chunkNumber, currentChunkSize);
+                failedChunkNumbers.add(chunkNumber);
+            } else {
+                log.info("{}:{} uploaded {} bytes", totalChunks, chunkNumber, currentChunkSize);
+                if (uploadFileRet.isMerged()) {
+                    return uploadFileRet;
+                }
+            }
+        }
+
+        // 重传上传失败的文件分片
+        if (!failedChunkNumbers.isEmpty()) {
+            for (int chunkNumber : failedChunkNumbers) {
+                long start = map.get(chunkNumber);
+                byte[] part = fileSplitter.getPart(file.getAbsolutePath(), start);
+                int currentChunkSize = part.length;
+                UploadFilePart uploadFilePart = new UploadFilePart(channelId, identifier, filename, relativePath,
+                        totalSize, chunkSize, totalChunks, chunkNumber, currentChunkSize);
+
+                UploadFileRet uploadFileRet = postObject(part, uploadFilePart);
+                if (uploadFileRet == null) {
+                    log.info("{}:{} upload failed", chunkNumber, currentChunkSize);
+                } else {
+                    log.info("{}:{} uploaded {} bytes", totalChunks, chunkNumber, currentChunkSize);
+                    if (uploadFileRet.isMerged()) {
+                        return uploadFileRet;
+                    }
+                }
+            }
         }
+
+        return null;
     }
 }

+ 13 - 0
oss-sdk/src/main/java/cn/reghao/oss/sdk/OssConsoleClient.java

@@ -425,4 +425,17 @@ public class OssConsoleClient {
         UploadFileRet uploadFileRet = ossStoreClient.postObjectWithJdkHttp(file, channelId, token);
         return uploadFileRet;
     }
+
+    public UploadFileRet uploadFilePart(int channelId, File file) throws Exception {
+        ServerInfo serverInfo = getUploadStore(channelId);
+        if (serverInfo == null) {
+            throw new Exception("获取 server_info 失败");
+        }
+        String ossUrl = serverInfo.getOssUrl();
+        String token = serverInfo.getToken();
+
+        ObjectMultipartUploadService multipartUploadService = new ObjectMultipartUploadService(ossUrl, token);
+        UploadFileRet uploadFileRet = multipartUploadService.upload(file, channelId);
+        return uploadFileRet;
+    }
 }

+ 49 - 19
oss-sdk/src/test/java/OssConsoleClientTest.java

@@ -1,3 +1,5 @@
+import cn.reghao.oss.api.rest.UploadPrepareRet;
+import cn.reghao.oss.api.rest.UploadedPart;
 import cn.reghao.oss.sdk.model.OssConsoleConfig;
 import cn.reghao.oss.api.dto.ServerInfo;
 import cn.reghao.oss.api.rest.UploadFileRet;
@@ -7,6 +9,7 @@ import cn.reghao.oss.sdk.OssConsoleClient;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.*;
+import java.util.List;
 
 /**
  * @author reghao
@@ -14,24 +17,55 @@ import java.io.*;
  */
 @Slf4j
 public class OssConsoleClientTest {
-    static void multipartUpload() throws Exception {
-        String storeEndpoint = "";
-        ObjectMultipartUploadService multipartUploadService = new ObjectMultipartUploadService(storeEndpoint);
+    static void multipartUpload(String endpoint, int channelId, String token) throws Exception {
+        ObjectMultipartUploadService multipartUploadService = new ObjectMultipartUploadService(endpoint, token);
         String filePath = "";
-        int channelId = 1;
-        multipartUploadService.upload(new File(filePath), channelId);
-        //multipartUploadService.create();
-        //multipartUploadService.get();
+        File file = new File(filePath);
+
+        /*UploadPrepareRet uploadPrepareRet = multipartUploadService.create(file);
+        if (uploadPrepareRet == null) {
+            return;
+        }
+
+        if (uploadPrepareRet.isExist()) {
+            return;
+        }
+
+        String uploadId = uploadPrepareRet.getUploadId();
+        long splitSize = uploadPrepareRet.getSplitSize();*/
+
+        UploadedPart uploadedPart = multipartUploadService.get(file);
+        if (uploadedPart != null) {
+            List<String> list = uploadedPart.getUploaded();
+        }
+
+        /*UploadFileRet uploadFileRet = multipartUploadService.upload(file, channelId);
+        if (uploadFileRet != null && uploadFileRet.isMerged()) {
+            String uploadId = uploadFileRet.getUploadId();
+            log.info("file uploaded, uploadId -> {}", uploadId);
+        }*/
+    }
+
+    static void upload(String ossUrl, int channelId, String token) throws Exception {
+        OssStoreClient ossStoreClient = new OssStoreClient(ossUrl);
+        String filePath = "/home/reghao/data/video/output.mp4";
+        File file = new File(filePath);
+        UploadFileRet uploadFileRet = ossStoreClient.postObjectWithJdkHttp(file, channelId, token);
+        if (uploadFileRet == null) {
+            log.info("文件上传失败");
+        } else {
+            log.info("{} -> {}", uploadFileRet.getUploadId(), uploadFileRet.getUrl());
+        }
     }
 
     public static void main(String[] args) throws Exception {
-        String consoleUrl = "http://ossconsole.reghao.cn";
-        String accessKeyId = "mWYMpbrL";
-        String accessKeySecret = "uvuTJKKbRfaSxNPM38";
+        String consoleUrl = "http://bnt.reghao.cn";
+        String accessKeyId = "ESCKn3Cd";
+        String accessKeySecret = "OL9SIOLoOqUjhMiQMv";
         OssConsoleConfig ossProperties = new OssConsoleConfig(consoleUrl, accessKeyId, accessKeySecret);
         OssConsoleClient ossConsoleClient = new OssConsoleClient(ossProperties);
 
-        int channelId = 101;
+        int channelId = 114;
         ServerInfo serverInfo = ossConsoleClient.getUploadStore(channelId);
         if (serverInfo == null) {
             log.info("获取 server_info 失败");
@@ -40,15 +74,11 @@ public class OssConsoleClientTest {
         String ossUrl = serverInfo.getOssUrl();
         String token = serverInfo.getToken();
 
-        OssStoreClient ossStoreClient = new OssStoreClient(ossUrl);
-        String filePath = "/home/reghao/data/video/output.mp4";
+        multipartUpload(ossUrl, channelId, token);
+
+        /*String filePath = "";
         File file = new File(filePath);
-        UploadFileRet uploadFileRet = ossStoreClient.postObjectWithJdkHttp(file, channelId, token);
-        if (uploadFileRet == null) {
-            log.info("文件上传失败");
-        } else {
-            log.info("{} -> {}", uploadFileRet.getUploadId(), uploadFileRet.getUrl());
-        }
+        ossConsoleClient.uploadFilePart(channelId, file);*/
 
         /*String objectName = "video/playback/28d0fd95e224499c9f2cf1d98b4551a5.flv";
         ossStoreClient.getObject(objectName);*/