|
|
@@ -6,7 +6,7 @@ import cn.reghao.oss.api.dto.rest.FastUploadResult;
|
|
|
import cn.reghao.oss.api.dto.rest.UploadResult;
|
|
|
import cn.reghao.oss.api.dto.rest.UploadFileRet;
|
|
|
import cn.reghao.oss.api.iface.ConsoleService;
|
|
|
-import cn.reghao.oss.api.util.FileUtil;
|
|
|
+import cn.reghao.oss.store.util.FileUtil;
|
|
|
import cn.reghao.oss.api.util.OssSamplingHash;
|
|
|
import cn.reghao.oss.store.config.OssStoreConfig;
|
|
|
import cn.reghao.oss.store.disk.DiskService;
|
|
|
@@ -14,7 +14,6 @@ import cn.reghao.oss.store.disk.HddFlushService;
|
|
|
import cn.reghao.oss.store.util.ResponseHelper;
|
|
|
import cn.reghao.oss.store.util.UploadProgressManager;
|
|
|
import io.netty.buffer.ByteBuf;
|
|
|
-import io.netty.buffer.Unpooled;
|
|
|
import io.netty.channel.ChannelHandlerContext;
|
|
|
import io.netty.channel.SimpleChannelInboundHandler;
|
|
|
import io.netty.handler.codec.http.*;
|
|
|
@@ -22,7 +21,6 @@ import io.netty.handler.codec.http.multipart.*;
|
|
|
import io.netty.util.AttributeKey;
|
|
|
import io.netty.util.ReferenceCountUtil;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.tika.Tika;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
@@ -48,7 +46,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
private HttpPostMultipartRequestDecoder decoder;
|
|
|
|
|
|
// 当前请求的上下文状态
|
|
|
- private static final Tika tika = new Tika();
|
|
|
private final Map<String, String> formData = new HashMap<>();
|
|
|
private final ConsoleService consoleService;
|
|
|
private final HddFlushService hddFlushService;
|
|
|
@@ -56,9 +53,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
private FileChannel singleFileChannel;
|
|
|
private String singleFileId;
|
|
|
private final MessageDigest messageDigest;
|
|
|
- // 8 字节
|
|
|
- private final ByteBuf headerCollector = Unpooled.buffer(8);
|
|
|
- private boolean isTypeIdentified = false;
|
|
|
private final OssStoreConfig ossStoreConfig;
|
|
|
|
|
|
public OssMultipartUploadHandler(OssStoreConfig ossStoreConfig, ConsoleService consoleService,
|
|
|
@@ -167,26 +161,6 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
if (chunk != null) {
|
|
|
// Nginx 的 client_body_buffer_size参数会显著影响 Netty 接收到的第一个 chunk 的大小
|
|
|
try {
|
|
|
- if (!isTypeIdentified) {
|
|
|
- // 将新到的数据写入 collector,最多写满 8 字节
|
|
|
- int need = 8 - headerCollector.readableBytes();
|
|
|
- if (need > 0) {
|
|
|
- headerCollector.writeBytes(chunk, Math.min(need, chunk.readableBytes()));
|
|
|
- // 只要凑够了 8 字节,或者文件已经传输完毕(应对极小文件)
|
|
|
- if (headerCollector.readableBytes() == 8 || fileUpload.isCompleted()) {
|
|
|
- // TODO 将检测到的 contentType 和 channel 要求的 contentType 对比, 不符合则立即终止上传
|
|
|
- String sniffedType = tika.detect(toByteArray(headerCollector));
|
|
|
- formData.put("sniffedContentType", sniffedType);
|
|
|
- isTypeIdentified = true;
|
|
|
- // 释放 collector 内存
|
|
|
- headerCollector.clear();
|
|
|
- }
|
|
|
- }
|
|
|
- // 注意:这里需要重置 chunk 的 readerIndex,因为 writeBytes 移动了它
|
|
|
- // 或者在前面使用 slice()/duplicate() 保持 chunk 原封不动写磁盘
|
|
|
- chunk.readerIndex(0);
|
|
|
- }
|
|
|
-
|
|
|
// 关键点:在移动指针前,先更新摘要
|
|
|
// 使用 nioBuffer() 不会移动 readerIndex,方便后续写入
|
|
|
ByteBuffer nioBuffer = chunk.nioBuffer();
|
|
|
@@ -260,12 +234,12 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
// 单文件上传
|
|
|
String uploadId = singleFileId;
|
|
|
String filename = formData.get("filename");
|
|
|
- String contentType = formData.get("sniffedContentType");
|
|
|
String clientSha256sum = formData.get("clientSha256sum");
|
|
|
String sha256sum = formData.get("sha256sum");
|
|
|
|
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
|
File tmpFile = new File(ssdTempPath);
|
|
|
+ String contentType = FileUtil.getContentType(ssdTempPath);
|
|
|
long size = tmpFile.length();
|
|
|
String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
|
UploadResult uploadResult = new UploadResult();
|
|
|
@@ -306,7 +280,7 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
String ssdTempPath = diskService.getSsdTempPath(uploadId);
|
|
|
File tmpFile = new File(ssdTempPath);
|
|
|
long size = tmpFile.length();
|
|
|
- String contentType = getContentType(tmpFile);
|
|
|
+ String contentType = FileUtil.getContentType(ssdTempPath);
|
|
|
String objectName = getObjectName(uploadId, filename, contentType, channelPrefix);
|
|
|
String clientSha256sum = formData.get("clientSha256sum");
|
|
|
|
|
|
@@ -351,25 +325,11 @@ public class OssMultipartUploadHandler extends SimpleChannelInboundHandler<HttpO
|
|
|
return String.format("%s%s.%s", channelPrefix, uploadId, suffix);
|
|
|
}
|
|
|
|
|
|
- private String getContentType(File file) {
|
|
|
- try {
|
|
|
- // Tika 会根据文件头字节进行深度探测
|
|
|
- return tika.detect(file);
|
|
|
- } catch (IOException e) {
|
|
|
- log.error("Tika 探测文件类型失败: {}", file.getAbsolutePath(), e);
|
|
|
- return "application/octet-stream";
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
private void reset() {
|
|
|
cleanDecoder();
|
|
|
if (messageDigest != null) {
|
|
|
messageDigest.reset();
|
|
|
}
|
|
|
-
|
|
|
- if (headerCollector.refCnt() > 0) {
|
|
|
- headerCollector.release();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
private void cleanDecoder() {
|