|
|
@@ -1,24 +1,47 @@
|
|
|
package cn.reghao.oss.store.handler;
|
|
|
|
|
|
-import cn.reghao.oss.api.dto.WebBody;
|
|
|
+import cn.reghao.jutil.jdk.security.DigestUtil;
|
|
|
+import cn.reghao.jutil.jdk.web.result.WebResult;
|
|
|
+import cn.reghao.oss.api.constant.UploadStatus;
|
|
|
+import cn.reghao.oss.api.dto.FastUploadResult;
|
|
|
+import cn.reghao.oss.api.dto.UploadResult;
|
|
|
import cn.reghao.oss.api.dto.rest.UploadFileRet;
|
|
|
+import cn.reghao.oss.api.iface.ConsoleService;
|
|
|
+import cn.reghao.oss.api.util.FileUtil;
|
|
|
+import cn.reghao.oss.api.util.OssSamplingHash;
|
|
|
+import cn.reghao.oss.store.config.OssStoreConfig;
|
|
|
import cn.reghao.oss.store.disk.DiskService;
|
|
|
import cn.reghao.oss.store.disk.HddFlushService;
|
|
|
import cn.reghao.oss.store.util.ResponseHelper;
|
|
|
import cn.reghao.oss.store.util.UploadProgressManager;
|
|
|
+import cn.reghao.oss.store.util.UploadState;
|
|
|
+import io.netty.buffer.ByteBuf;
|
|
|
import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.SimpleChannelInboundHandler;
|
|
|
import io.netty.handler.codec.http.*;
|
|
|
import io.netty.handler.codec.http.multipart.*;
|
|
|
import io.netty.util.AttributeKey;
|
|
|
+import io.netty.util.ReferenceCountUtil;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.tika.Tika;
|
|
|
+import org.apache.tika.metadata.Metadata;
|
|
|
+import org.apache.tika.metadata.TikaCoreProperties;
|
|
|
|
|
|
+import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
import java.io.RandomAccessFile;
|
|
|
+import java.nio.ByteBuffer;
|
|
|
import java.nio.channels.FileChannel;
|
|
|
+import java.security.MessageDigest;
|
|
|
+import java.security.NoSuchAlgorithmException;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+import java.util.UUID;
|
|
|
|
|
|
/**
|
|
|
+ * 处理 multipart/form-data 上传, 每个线程一个 handler
|
|
|
+ *
|
|
|
* @author reghao
|
|
|
* @date 2026-02-01 12:39:37
|
|
|
*/
|
|
|
@@ -29,19 +52,32 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
private HttpPostMultipartRequestDecoder decoder;
|
|
|
|
|
|
// 当前请求的上下文状态
|
|
|
- private String currentUploadId;
|
|
|
- private long currentOffset = -1;
|
|
|
- private int currentChunkNumber;
|
|
|
- private int totalParts = 0;
|
|
|
- private long totalSize = 0;
|
|
|
- private long chunkSize = 0;
|
|
|
- private String filename = "";
|
|
|
- private HddFlushService hddFlushService;
|
|
|
- private DiskService diskService;
|
|
|
-
|
|
|
- public OssMultipartUploadHandler(HddFlushService hddFlushService, DiskService diskService) {
|
|
|
+ private static final Tika tika = new Tika();
|
|
|
+ private final Map<String, String> formData = new HashMap<>();
|
|
|
+ private final OssStoreConfig ossStoreConfig;
|
|
|
+ private final ConsoleService consoleService;
|
|
|
+ private final HddFlushService hddFlushService;
|
|
|
+ private final DiskService diskService;
|
|
|
+ private FileChannel singleFileChannel;
|
|
|
+ private String singleFileId;
|
|
|
+ private final MessageDigest messageDigest;
|
|
|
+ // 8 字节
|
|
|
+ private final ByteBuf headerCollector = Unpooled.buffer(8);
|
|
|
+ private boolean isTypeIdentified = false;
|
|
|
+ private final String hostPort;
|
|
|
+
|
|
|
+ public OssMultipartUploadHandler(OssStoreConfig ossStoreConfig, ConsoleService consoleService,
|
|
|
+ HddFlushService hddFlushService, DiskService diskService) {
|
|
|
+ this.hostPort = String.format("%s:%s", ossStoreConfig.getStoreHost(), ossStoreConfig.getPort());
|
|
|
+ this.ossStoreConfig = ossStoreConfig;
|
|
|
+ this.consoleService = consoleService;
|
|
|
this.hddFlushService = hddFlushService;
|
|
|
this.diskService = diskService;
|
|
|
+ try {
|
|
|
+ this.messageDigest = MessageDigest.getInstance("SHA-256");
|
|
|
+ } catch (NoSuchAlgorithmException e) {
|
|
|
+ throw new RuntimeException("SHA-256 algorithm not found", e);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -93,18 +129,26 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
private void processAttribute(Attribute attr) throws IOException {
|
|
|
String name = attr.getName();
|
|
|
String value = attr.getValue();
|
|
|
- if ("identifier".equals(name)) currentUploadId = value;
|
|
|
+ formData.put(name, value);
|
|
|
+ /*if ("identifier".equals(name)) currentUploadId = value;
|
|
|
if ("chunkNumber".equals(name)) currentChunkNumber = Integer.parseInt(value);
|
|
|
if ("chunkSize".equals(name)) chunkSize = Long.parseLong(value);
|
|
|
if ("totalChunks".equals(name)) totalParts = Integer.parseInt(value);
|
|
|
- if ("filename".equals(name)) filename = value;
|
|
|
+ if ("filename".equals(name)) filename = value;*/
|
|
|
}
|
|
|
|
|
|
// 处理 multipart/form-data 的文件
|
|
|
private void processFileUpload(FileUpload fileUpload) throws IOException {
|
|
|
+ String currentUploadId = formData.get("identifier");
|
|
|
+ if (currentUploadId == null) {
|
|
|
+ handleSingleFileUploadStreamingly(fileUpload);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// 在非聚合模式下,即使 decoder.next() 返回了 fileUpload,
|
|
|
// 也要判断它是否真的接收完毕。
|
|
|
if (fileUpload.isCompleted()) {
|
|
|
+ long currentChunkNumber = Long.parseLong(formData.get("chunkNumber"));
|
|
|
// 执行你的 SSD 写入逻辑
|
|
|
handleChunkUpload(currentUploadId, currentChunkNumber, fileUpload);
|
|
|
} else {
|
|
|
@@ -115,6 +159,90 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ private void handleSingleFileUploadStreamingly(FileUpload fileUpload) throws IOException {
|
|
|
+ // 1. 初始化文件 ID 和 Channel (仅在第一次进入时)
|
|
|
+ if (singleFileId == null) {
|
|
|
+ singleFileId = UUID.randomUUID().toString().replace("-", "");
|
|
|
+ formData.put("uploadId", singleFileId);
|
|
|
+
|
|
|
+ String filename = fileUpload.getFilename();
|
|
|
+ formData.put("filename", filename);
|
|
|
+
|
|
|
+ String ssdPath = diskService.getSsdTempPath(singleFileId);
|
|
|
+ RandomAccessFile raf = new RandomAccessFile(ssdPath, "rw");
|
|
|
+ singleFileChannel = raf.getChannel();
|
|
|
+ messageDigest.reset();
|
|
|
+ }
|
|
|
+
|
|
|
+ // 2. 获取当前已经解码出来的、尚未读取的新增字节(即获取当前新增的增量数据)
|
|
|
+ // 注意:不要直接用 fileUpload.getByteBuf(),那个可能包含已经写过的数据
|
|
|
+ ByteBuf chunk = fileUpload.getChunk((int) fileUpload.length() - (int) singleFileChannel.size());
|
|
|
+ if (chunk != null) {
|
|
|
+ // Nginx 的 client_body_buffer_size参数会显著影响 Netty 接收到的第一个 chunk 的大小
|
|
|
+ try {
|
|
|
+ if (!isTypeIdentified) {
|
|
|
+ // 将新到的数据写入 collector,最多写满 8 字节
|
|
|
+ int need = 8 - headerCollector.readableBytes();
|
|
|
+ if (need > 0) {
|
|
|
+ headerCollector.writeBytes(chunk, Math.min(need, chunk.readableBytes()));
|
|
|
+ // 只要凑够了 8 字节,或者文件已经传输完毕(应对极小文件)
|
|
|
+ if (headerCollector.readableBytes() == 8 || fileUpload.isCompleted()) {
|
|
|
+ String sniffedType = tika.detect(toByteArray(headerCollector));
|
|
|
+ formData.put("sniffedContentType", sniffedType);
|
|
|
+ isTypeIdentified = true;
|
|
|
+ // 释放 collector 内存
|
|
|
+ headerCollector.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 注意:这里需要重置 chunk 的 readerIndex,因为 writeBytes 移动了它
|
|
|
+ // 或者在前面使用 slice()/duplicate() 保持 chunk 原封不动写磁盘
|
|
|
+ chunk.readerIndex(0);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 关键点:在移动指针前,先更新摘要
|
|
|
+ // 使用 nioBuffer() 不会移动 readerIndex,方便后续写入
|
|
|
+ ByteBuffer nioBuffer = chunk.nioBuffer();
|
|
|
+ // 1. 更新 SHA-256 摘要
|
|
|
+ messageDigest.update(nioBuffer.duplicate());
|
|
|
+
|
|
|
+ // 3. 将这部分新增字节写入磁盘
|
|
|
+ while (chunk.isReadable()) {
|
|
|
+ // write 方法会返回实际写入字节,手动移动 readerIndex 避免死循环
|
|
|
+ int written = singleFileChannel.write(chunk.nioBuffer());
|
|
|
+ chunk.skipBytes(written);
|
|
|
+ }
|
|
|
+ } finally {
|
|
|
+ // 4. 必须手动释放这个 chunk,否则会堆外内存泄露
|
|
|
+ ReferenceCountUtil.release(chunk);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // 5. 检查是否完成
|
|
|
+ if (fileUpload.isCompleted()) {
|
|
|
+ singleFileChannel.force(true); // 确保落盘
|
|
|
+ singleFileChannel.close();
|
|
|
+ singleFileChannel = null;
|
|
|
+
|
|
|
+ // 3. 计算最终的十六进制摘要串
|
|
|
+ byte[] hashBytes = messageDigest.digest();
|
|
|
+ String sha256sum = OssSamplingHash.bytesToHex(hashBytes);
|
|
|
+ formData.put("sha256sum", sha256sum);
|
|
|
+ log.info("文件上传完成,SHA-256: {}", sha256sum);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private byte[] toByteArray(ByteBuf buf) {
|
|
|
+ // 1. 获取可读字节数
|
|
|
+ int length = buf.readableBytes();
|
|
|
+ byte[] array = new byte[length];
|
|
|
+
|
|
|
+ // 2. 将数据从 ByteBuf(可能是堆外内存)拷贝到堆内的 byte[]
|
|
|
+ // getBytes(index, array) 不会移动 readerIndex
|
|
|
+ buf.getBytes(buf.readerIndex(), array);
|
|
|
+
|
|
|
+ return array;
|
|
|
+ }
|
|
|
+
|
|
|
private void handleChunkUpload(String uploadId, long offset, FileUpload fileUpload) throws IOException {
|
|
|
// SSD 上的临时预分配文件路径
|
|
|
String ssdPath = diskService.getSsdTempPath(uploadId);
|
|
|
@@ -122,57 +250,142 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
try (RandomAccessFile raf = new RandomAccessFile(ssdPath, "rw");
|
|
|
FileChannel channel = raf.getChannel()) {
|
|
|
|
|
|
+ long chunkSize = Long.parseLong(formData.get("chunkSize"));
|
|
|
// 定位到 SSD 文件中的指定偏移量
|
|
|
+ int currentChunkNumber = Integer.parseInt(formData.get("chunkNumber"));
|
|
|
long pos = (currentChunkNumber - 1)*chunkSize;
|
|
|
channel.position(pos);
|
|
|
// fileUpload.get() 可能是内存 Buffer 也可能是临时文件,Netty 自动处理
|
|
|
channel.write(fileUpload.getByteBuf().nioBuffer());
|
|
|
|
|
|
// 写入成功后标记位图
|
|
|
- UploadProgressManager.markPart(currentUploadId, currentChunkNumber);
|
|
|
+ UploadProgressManager.markPart(uploadId, currentChunkNumber);
|
|
|
System.out.printf("分片写入成功: ID=%s, Offset=%s, Size=%s\n", uploadId, offset, fileUpload.length());
|
|
|
} catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
+ log.error("{}", e.getMessage());
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- private void finalizeUpload(ChannelHandlerContext ctx) throws IOException {
|
|
|
+ private void finalizeUpload(ChannelHandlerContext ctx) throws Exception {
|
|
|
+ String channelPrefix = (String) ctx.channel().attr(AttributeKey.valueOf("channelPrefix")).get();
|
|
|
+ long uploadBy = (Long) ctx.channel().attr(AttributeKey.valueOf("uploadBy")).get();
|
|
|
+ String currentUploadId = formData.get("identifier");
|
|
|
+ if (currentUploadId == null) {
|
|
|
+ // 单文件上传
|
|
|
+ String uploadId = formData.get("uploadId");
|
|
|
+ String filename = formData.get("filename");
|
|
|
+ String contentType = formData.get("sniffedContentType");
|
|
|
+ String sha256sum = formData.get("sha256sum");
|
|
|
+
|
|
|
+ String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
|
+ File tmpFile = new File(ssdTempPath);
|
|
|
+ long size = tmpFile.length();
|
|
|
+ String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
|
+ UploadResult uploadResult = new UploadResult();
|
|
|
+ uploadResult.setSha256sum(sha256sum);
|
|
|
+ //uploadResult.setAbsolutePath("");
|
|
|
+ uploadResult.setSize(size);
|
|
|
+ uploadResult.setFilename(filename);
|
|
|
+ uploadResult.setContentType(contentType);
|
|
|
+ uploadResult.setChannelPrefix(channelPrefix);
|
|
|
+ uploadResult.setObjectId(uploadId);
|
|
|
+ uploadResult.setObjectName(objectName);
|
|
|
+ uploadResult.setUploadBy(uploadBy);
|
|
|
+ uploadResult.setHostPort(hostPort);
|
|
|
+ uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
|
+
|
|
|
+ FastUploadResult fastUploadResult = new FastUploadResult(uploadResult);
|
|
|
+ if (consoleService.checkExists(uploadResult.getSha256sum())) {
|
|
|
+ log.info("文件 {} 触发秒传, 删除 ssd 中的临时文件 {}", uploadResult.getSha256sum(), ssdTempPath);
|
|
|
+ consoleService.bindOnly(fastUploadResult);
|
|
|
+ } else {
|
|
|
+ consoleService.registerAndBind(uploadResult);
|
|
|
+ hddFlushService.moveSsdToHdd(uploadId, sha256sum, size, ssdTempPath);
|
|
|
+ }
|
|
|
+
|
|
|
+ // 返回成功响应
|
|
|
+ String url = "";
|
|
|
+ UploadFileRet uploadFileRet = new UploadFileRet(uploadId, url, true);
|
|
|
+ String webResult = WebResult.success(uploadFileRet);
|
|
|
+ ResponseHelper.sendJsonResponse(ctx, webResult);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
// 1. 业务逻辑处理 (Bitmap 等)
|
|
|
+ int totalParts = Integer.parseInt(formData.get("totalChunks"));
|
|
|
if (UploadProgressManager.isComplete(currentUploadId, totalParts)) {
|
|
|
log.info("文件 {} 校验通过,准备触发 HDD 异步搬运", currentUploadId);
|
|
|
+ String uploadId = currentUploadId;
|
|
|
+ String filename = formData.get("filename");
|
|
|
+ String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
|
+ File tmpFile = new File(ssdTempPath);
|
|
|
+ long size = tmpFile.length();
|
|
|
+ String contentType = getContentType(tmpFile);
|
|
|
+ String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
|
|
|
|
- String channelPrefix = (String) ctx.channel().attr(AttributeKey.valueOf("channelPrefix")).get();
|
|
|
+ UploadResult uploadResult = new UploadResult();
|
|
|
+ //uploadResult.setSha256sum("");
|
|
|
+ //uploadResult.setAbsolutePath("");
|
|
|
+ uploadResult.setSize(size);
|
|
|
+ uploadResult.setFilename(filename);
|
|
|
+ uploadResult.setContentType(contentType);
|
|
|
+ uploadResult.setChannelPrefix(channelPrefix);
|
|
|
+ uploadResult.setObjectId(uploadId);
|
|
|
+ uploadResult.setObjectName(objectName);
|
|
|
+ uploadResult.setUploadBy(uploadBy);
|
|
|
+ uploadResult.setHostPort(hostPort);
|
|
|
+ uploadResult.setUploadStatus(UploadStatus.FLUSHING.getCode());
|
|
|
+ consoleService.registerAndBind(uploadResult);
|
|
|
|
|
|
// 2. 触发合并逻辑 (合并 SSD 上的碎片)
|
|
|
- hddFlushService.triggerFlush(currentUploadId, channelPrefix, filename);
|
|
|
-
|
|
|
+ hddFlushService.triggerFlush(currentUploadId);
|
|
|
// 3. 清理内存位图
|
|
|
UploadProgressManager.remove(currentUploadId);
|
|
|
|
|
|
// 4. 返回成功响应
|
|
|
- UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId, "");
|
|
|
- WebBody webBody = new WebBody(uploadFileRet);
|
|
|
- ResponseHelper.sendJsonResponse(ctx, HttpResponseStatus.OK, webBody);
|
|
|
- System.out.println(currentUploadId + " 的分片已完成并已回写响应");
|
|
|
+ String url = "";
|
|
|
+ UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId, url, true);
|
|
|
+ String webResult = WebResult.success(uploadFileRet);
|
|
|
+ ResponseHelper.sendJsonResponse(ctx, webResult);
|
|
|
+ log.info("{} 的分片上传已完成并已回写响应", currentUploadId);
|
|
|
} else {
|
|
|
- // 2. 创建响应
|
|
|
- // 明确指定 Content-Length,否则某些客户端会一直等待 Body 结束
|
|
|
- FullHttpResponse response = new DefaultFullHttpResponse(
|
|
|
- HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.EMPTY_BUFFER);
|
|
|
- response.headers().set(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_JSON);
|
|
|
- response.headers().setInt(HttpHeaderNames.CONTENT_LENGTH, response.content().readableBytes());
|
|
|
-
|
|
|
- UploadFileRet uploadFileRet = new UploadFileRet("");
|
|
|
- WebBody webBody = new WebBody(uploadFileRet);
|
|
|
- ResponseHelper.sendJsonResponse(ctx, HttpResponseStatus.OK, webBody);
|
|
|
- System.out.println("分片 " + currentChunkNumber + " 写入完成并已回写响应");
|
|
|
+ UploadFileRet uploadFileRet = new UploadFileRet(currentUploadId);
|
|
|
+ String webResult = WebResult.success(uploadFileRet);
|
|
|
+ ResponseHelper.sendJsonResponse(ctx, webResult);
|
|
|
+
|
|
|
+ int currentChunkNumber = Integer.parseInt(formData.get("chunkNumber"));
|
|
|
+ log.info("分片 {} 写入完成并已回写响应", currentChunkNumber);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getObjectName(String uploadId, String filename, String contentType, String channelPrefix) {
|
|
|
+ String suffix = FileUtil.getSuffix(filename);
|
|
|
+ if (suffix.isBlank()) {
|
|
|
+ suffix = FileUtil.getSuffixByMime(contentType);
|
|
|
+ }
|
|
|
+
|
|
|
+ return String.format("%s%s.%s", channelPrefix, uploadId, suffix);
|
|
|
+ }
|
|
|
+
|
|
|
+ private String getContentType(File file) {
|
|
|
+ try {
|
|
|
+ // Tika 会根据文件头字节进行深度探测
|
|
|
+ return tika.detect(file);
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error("Tika 探测文件类型失败: {}", file.getAbsolutePath(), e);
|
|
|
+ return "application/octet-stream";
|
|
|
}
|
|
|
}
|
|
|
|
|
|
private void reset() {
|
|
|
cleanDecoder();
|
|
|
- currentUploadId = null;
|
|
|
- currentChunkNumber = -1;
|
|
|
+ if (messageDigest != null) {
|
|
|
+ messageDigest.reset();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (headerCollector.refCnt() > 0) {
|
|
|
+ headerCollector.release();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
private void cleanDecoder() {
|
|
|
@@ -191,10 +404,7 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
*/
|
|
|
@Override
|
|
|
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
|
|
|
- if (decoder != null) {
|
|
|
- // 如果文件没写完就断了,这里可以决定是保留残片还是删除
|
|
|
- reset();
|
|
|
- }
|
|
|
+ reset();
|
|
|
super.channelInactive(ctx);
|
|
|
}
|
|
|
|