|
@@ -1,10 +1,9 @@
|
|
|
package cn.reghao.oss.store.handler;
|
|
package cn.reghao.oss.store.handler;
|
|
|
|
|
|
|
|
-import cn.reghao.jutil.jdk.security.DigestUtil;
|
|
|
|
|
import cn.reghao.jutil.jdk.web.result.WebResult;
|
|
import cn.reghao.jutil.jdk.web.result.WebResult;
|
|
|
import cn.reghao.oss.api.constant.UploadStatus;
|
|
import cn.reghao.oss.api.constant.UploadStatus;
|
|
|
-import cn.reghao.oss.api.dto.FastUploadResult;
|
|
|
|
|
-import cn.reghao.oss.api.dto.UploadResult;
|
|
|
|
|
|
|
+import cn.reghao.oss.api.dto.rest.FastUploadResult;
|
|
|
|
|
+import cn.reghao.oss.api.dto.rest.UploadResult;
|
|
|
import cn.reghao.oss.api.dto.rest.UploadFileRet;
|
|
import cn.reghao.oss.api.dto.rest.UploadFileRet;
|
|
|
import cn.reghao.oss.api.iface.ConsoleService;
|
|
import cn.reghao.oss.api.iface.ConsoleService;
|
|
|
import cn.reghao.oss.api.util.FileUtil;
|
|
import cn.reghao.oss.api.util.FileUtil;
|
|
@@ -14,7 +13,6 @@ import cn.reghao.oss.store.disk.DiskService;
|
|
|
import cn.reghao.oss.store.disk.HddFlushService;
|
|
import cn.reghao.oss.store.disk.HddFlushService;
|
|
|
import cn.reghao.oss.store.util.ResponseHelper;
|
|
import cn.reghao.oss.store.util.ResponseHelper;
|
|
|
import cn.reghao.oss.store.util.UploadProgressManager;
|
|
import cn.reghao.oss.store.util.UploadProgressManager;
|
|
|
-import cn.reghao.oss.store.util.UploadState;
|
|
|
|
|
import io.netty.buffer.ByteBuf;
|
|
import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.buffer.Unpooled;
|
|
import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
@@ -25,8 +23,6 @@ import io.netty.util.AttributeKey;
|
|
|
import io.netty.util.ReferenceCountUtil;
|
|
import io.netty.util.ReferenceCountUtil;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.apache.tika.Tika;
|
|
import org.apache.tika.Tika;
|
|
|
-import org.apache.tika.metadata.Metadata;
|
|
|
|
|
-import org.apache.tika.metadata.TikaCoreProperties;
|
|
|
|
|
|
|
|
|
|
import java.io.File;
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
import java.io.IOException;
|
|
@@ -54,7 +50,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
// 当前请求的上下文状态
|
|
// 当前请求的上下文状态
|
|
|
private static final Tika tika = new Tika();
|
|
private static final Tika tika = new Tika();
|
|
|
private final Map<String, String> formData = new HashMap<>();
|
|
private final Map<String, String> formData = new HashMap<>();
|
|
|
- private final OssStoreConfig ossStoreConfig;
|
|
|
|
|
private final ConsoleService consoleService;
|
|
private final ConsoleService consoleService;
|
|
|
private final HddFlushService hddFlushService;
|
|
private final HddFlushService hddFlushService;
|
|
|
private final DiskService diskService;
|
|
private final DiskService diskService;
|
|
@@ -64,11 +59,10 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
// 8 字节
|
|
// 8 字节
|
|
|
private final ByteBuf headerCollector = Unpooled.buffer(8);
|
|
private final ByteBuf headerCollector = Unpooled.buffer(8);
|
|
|
private boolean isTypeIdentified = false;
|
|
private boolean isTypeIdentified = false;
|
|
|
- private final String hostPort;
|
|
|
|
|
|
|
+ private final OssStoreConfig ossStoreConfig;
|
|
|
|
|
|
|
|
public OssMultipartUploadHandler(OssStoreConfig ossStoreConfig, ConsoleService consoleService,
|
|
public OssMultipartUploadHandler(OssStoreConfig ossStoreConfig, ConsoleService consoleService,
|
|
|
HddFlushService hddFlushService, DiskService diskService) {
|
|
HddFlushService hddFlushService, DiskService diskService) {
|
|
|
- this.hostPort = String.format("%s:%s", ossStoreConfig.getStoreHost(), ossStoreConfig.getPort());
|
|
|
|
|
this.ossStoreConfig = ossStoreConfig;
|
|
this.ossStoreConfig = ossStoreConfig;
|
|
|
this.consoleService = consoleService;
|
|
this.consoleService = consoleService;
|
|
|
this.hddFlushService = hddFlushService;
|
|
this.hddFlushService = hddFlushService;
|
|
@@ -130,16 +124,11 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
String name = attr.getName();
|
|
String name = attr.getName();
|
|
|
String value = attr.getValue();
|
|
String value = attr.getValue();
|
|
|
formData.put(name, value);
|
|
formData.put(name, value);
|
|
|
- /*if ("identifier".equals(name)) currentUploadId = value;
|
|
|
|
|
- if ("chunkNumber".equals(name)) currentChunkNumber = Integer.parseInt(value);
|
|
|
|
|
- if ("chunkSize".equals(name)) chunkSize = Long.parseLong(value);
|
|
|
|
|
- if ("totalChunks".equals(name)) totalParts = Integer.parseInt(value);
|
|
|
|
|
- if ("filename".equals(name)) filename = value;*/
|
|
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 处理 multipart/form-data 的文件
|
|
// 处理 multipart/form-data 的文件
|
|
|
private void processFileUpload(FileUpload fileUpload) throws IOException {
|
|
private void processFileUpload(FileUpload fileUpload) throws IOException {
|
|
|
- String currentUploadId = formData.get("identifier");
|
|
|
|
|
|
|
+ String currentUploadId = formData.get("uploadId");
|
|
|
if (currentUploadId == null) {
|
|
if (currentUploadId == null) {
|
|
|
handleSingleFileUploadStreamingly(fileUpload);
|
|
handleSingleFileUploadStreamingly(fileUpload);
|
|
|
return;
|
|
return;
|
|
@@ -163,8 +152,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
// 1. 初始化文件 ID 和 Channel (仅在第一次进入时)
|
|
// 1. 初始化文件 ID 和 Channel (仅在第一次进入时)
|
|
|
if (singleFileId == null) {
|
|
if (singleFileId == null) {
|
|
|
singleFileId = UUID.randomUUID().toString().replace("-", "");
|
|
singleFileId = UUID.randomUUID().toString().replace("-", "");
|
|
|
- formData.put("uploadId", singleFileId);
|
|
|
|
|
-
|
|
|
|
|
String filename = fileUpload.getFilename();
|
|
String filename = fileUpload.getFilename();
|
|
|
formData.put("filename", filename);
|
|
formData.put("filename", filename);
|
|
|
|
|
|
|
@@ -228,7 +215,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
byte[] hashBytes = messageDigest.digest();
|
|
byte[] hashBytes = messageDigest.digest();
|
|
|
String sha256sum = OssSamplingHash.bytesToHex(hashBytes);
|
|
String sha256sum = OssSamplingHash.bytesToHex(hashBytes);
|
|
|
formData.put("sha256sum", sha256sum);
|
|
formData.put("sha256sum", sha256sum);
|
|
|
- log.info("文件上传完成,SHA-256: {}", sha256sum);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|
|
@@ -261,7 +247,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
|
|
|
|
|
// 写入成功后标记位图
|
|
// 写入成功后标记位图
|
|
|
UploadProgressManager.markPart(uploadId, currentChunkNumber);
|
|
UploadProgressManager.markPart(uploadId, currentChunkNumber);
|
|
|
- System.out.printf("分片写入成功: ID=%s, Offset=%s, Size=%s\n", uploadId, offset, fileUpload.length());
|
|
|
|
|
} catch (Exception e) {
|
|
} catch (Exception e) {
|
|
|
log.error("{}", e.getMessage());
|
|
log.error("{}", e.getMessage());
|
|
|
}
|
|
}
|
|
@@ -270,12 +255,13 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
private void finalizeUpload(ChannelHandlerContext ctx) throws Exception {
|
|
private void finalizeUpload(ChannelHandlerContext ctx) throws Exception {
|
|
|
String channelPrefix = (String) ctx.channel().attr(AttributeKey.valueOf("channelPrefix")).get();
|
|
String channelPrefix = (String) ctx.channel().attr(AttributeKey.valueOf("channelPrefix")).get();
|
|
|
long uploadBy = (Long) ctx.channel().attr(AttributeKey.valueOf("uploadBy")).get();
|
|
long uploadBy = (Long) ctx.channel().attr(AttributeKey.valueOf("uploadBy")).get();
|
|
|
- String currentUploadId = formData.get("identifier");
|
|
|
|
|
|
|
+ String currentUploadId = formData.get("uploadId");
|
|
|
if (currentUploadId == null) {
|
|
if (currentUploadId == null) {
|
|
|
// 单文件上传
|
|
// 单文件上传
|
|
|
- String uploadId = formData.get("uploadId");
|
|
|
|
|
|
|
+ String uploadId = singleFileId;
|
|
|
String filename = formData.get("filename");
|
|
String filename = formData.get("filename");
|
|
|
String contentType = formData.get("sniffedContentType");
|
|
String contentType = formData.get("sniffedContentType");
|
|
|
|
|
+ String clientSha256sum = formData.get("clientSha256sum");
|
|
|
String sha256sum = formData.get("sha256sum");
|
|
String sha256sum = formData.get("sha256sum");
|
|
|
|
|
|
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
@@ -292,12 +278,12 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
uploadResult.setObjectId(uploadId);
|
|
uploadResult.setObjectId(uploadId);
|
|
|
uploadResult.setObjectName(objectName);
|
|
uploadResult.setObjectName(objectName);
|
|
|
uploadResult.setUploadBy(uploadBy);
|
|
uploadResult.setUploadBy(uploadBy);
|
|
|
- uploadResult.setHostPort(hostPort);
|
|
|
|
|
|
|
+ uploadResult.setHost(ossStoreConfig.getStoreHost());
|
|
|
|
|
+ uploadResult.setPort(ossStoreConfig.getPort());
|
|
|
uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
|
|
|
|
|
|
FastUploadResult fastUploadResult = new FastUploadResult(uploadResult);
|
|
FastUploadResult fastUploadResult = new FastUploadResult(uploadResult);
|
|
|
if (consoleService.checkExists(uploadResult.getSha256sum())) {
|
|
if (consoleService.checkExists(uploadResult.getSha256sum())) {
|
|
|
- log.info("文件 {} 触发秒传, 删除 ssd 中的临时文件 {}", uploadResult.getSha256sum(), ssdTempPath);
|
|
|
|
|
consoleService.bindOnly(fastUploadResult);
|
|
consoleService.bindOnly(fastUploadResult);
|
|
|
} else {
|
|
} else {
|
|
|
consoleService.registerAndBind(uploadResult);
|
|
consoleService.registerAndBind(uploadResult);
|
|
@@ -315,7 +301,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
// 1. 业务逻辑处理 (Bitmap 等)
|
|
// 1. 业务逻辑处理 (Bitmap 等)
|
|
|
int totalParts = Integer.parseInt(formData.get("totalChunks"));
|
|
int totalParts = Integer.parseInt(formData.get("totalChunks"));
|
|
|
if (UploadProgressManager.isComplete(currentUploadId, totalParts)) {
|
|
if (UploadProgressManager.isComplete(currentUploadId, totalParts)) {
|
|
|
- log.info("文件 {} 校验通过,准备触发 HDD 异步搬运", currentUploadId);
|
|
|
|
|
String uploadId = currentUploadId;
|
|
String uploadId = currentUploadId;
|
|
|
String filename = formData.get("filename");
|
|
String filename = formData.get("filename");
|
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
@@ -323,9 +308,10 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
long size = tmpFile.length();
|
|
long size = tmpFile.length();
|
|
|
String contentType = getContentType(tmpFile);
|
|
String contentType = getContentType(tmpFile);
|
|
|
String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
|
|
|
+ String clientSha256sum = formData.get("clientSha256sum");
|
|
|
|
|
|
|
|
UploadResult uploadResult = new UploadResult();
|
|
UploadResult uploadResult = new UploadResult();
|
|
|
- //uploadResult.setSha256sum("");
|
|
|
|
|
|
|
+ uploadResult.setSha256sum(clientSha256sum);
|
|
|
//uploadResult.setAbsolutePath("");
|
|
//uploadResult.setAbsolutePath("");
|
|
|
uploadResult.setSize(size);
|
|
uploadResult.setSize(size);
|
|
|
uploadResult.setFilename(filename);
|
|
uploadResult.setFilename(filename);
|
|
@@ -334,12 +320,13 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
uploadResult.setObjectId(uploadId);
|
|
uploadResult.setObjectId(uploadId);
|
|
|
uploadResult.setObjectName(objectName);
|
|
uploadResult.setObjectName(objectName);
|
|
|
uploadResult.setUploadBy(uploadBy);
|
|
uploadResult.setUploadBy(uploadBy);
|
|
|
- uploadResult.setHostPort(hostPort);
|
|
|
|
|
|
|
+ uploadResult.setHost(ossStoreConfig.getStoreHost());
|
|
|
|
|
+ uploadResult.setPort(ossStoreConfig.getPort());
|
|
|
uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
|
consoleService.registerAndBind(uploadResult);
|
|
consoleService.registerAndBind(uploadResult);
|
|
|
|
|
|
|
|
// 2. 触发合并逻辑 (合并 SSD 上的碎片)
|
|
// 2. 触发合并逻辑 (合并 SSD 上的碎片)
|
|
|
- hddFlushService.triggerFlush(currentUploadId);
|
|
|
|
|
|
|
+ hddFlushService.triggerFlush(currentUploadId, clientSha256sum);
|
|
|
// 3. 清理内存位图
|
|
// 3. 清理内存位图
|
|
|
UploadProgressManager.remove(currentUploadId);
|
|
UploadProgressManager.remove(currentUploadId);
|
|
|
|
|
|
|
@@ -348,14 +335,10 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId, url, true);
|
|
UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId, url, true);
|
|
|
String jsonResult = WebResult.success(uploadFileRet);
|
|
String jsonResult = WebResult.success(uploadFileRet);
|
|
|
ResponseHelper.sendJsonResponse(ctx, jsonResult);
|
|
ResponseHelper.sendJsonResponse(ctx, jsonResult);
|
|
|
- log.info("{} 的分片上传已完成并已回写响应", currentUploadId);
|
|
|
|
|
} else {
|
|
} else {
|
|
|
UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId);
|
|
UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId);
|
|
|
String jsonResult = WebResult.success(uploadFileRet);
|
|
String jsonResult = WebResult.success(uploadFileRet);
|
|
|
ResponseHelper.sendJsonResponse(ctx, jsonResult);
|
|
ResponseHelper.sendJsonResponse(ctx, jsonResult);
|
|
|
-
|
|
|
|
|
- int currentChunkNumber = Integer.parseInt(formData.get("chunkNumber"));
|
|
|
|
|
- log.info("分片 {} 写入完成并已回写响应", currentChunkNumber);
|
|
|
|
|
}
|
|
}
|
|
|
}
|
|
}
|
|
|
|
|
|