|
|
@@ -1,15 +1,19 @@
|
|
|
package cn.reghao.oss.store.config.spring;
|
|
|
|
|
|
import cn.reghao.jutil.jdk.machine.id.MachineId;
|
|
|
-import cn.reghao.jutil.jdk.result.WebResult;
|
|
|
import cn.reghao.jutil.jdk.serializer.JsonConverter;
|
|
|
+import cn.reghao.jutil.jdk.store.LocalStore;
|
|
|
import cn.reghao.jutil.jdk.store.LocalStores;
|
|
|
import cn.reghao.jutil.jdk.store.SubDirCount;
|
|
|
import cn.reghao.oss.sdk.OssConsoleClient;
|
|
|
+import cn.reghao.oss.store.api.dto.ObjectChannel;
|
|
|
+import cn.reghao.oss.store.api.dto.StoreDiskDto;
|
|
|
+import cn.reghao.oss.store.api.dto.StoreNodeDto;
|
|
|
+import cn.reghao.oss.store.api.dto.StoreProperties;
|
|
|
import cn.reghao.oss.store.config.SpringProperties;
|
|
|
import cn.reghao.oss.store.db.mapper.DataBlockMapper;
|
|
|
+import cn.reghao.oss.store.service.StoreLocalCache;
|
|
|
import cn.reghao.oss.store.task.FileTask;
|
|
|
-import com.google.gson.reflect.TypeToken;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.DisposableBean;
|
|
|
import org.springframework.boot.ApplicationArguments;
|
|
|
@@ -18,11 +22,7 @@ import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.io.File;
|
|
|
import java.io.IOException;
|
|
|
-import java.lang.reflect.Type;
|
|
|
-import java.net.URI;
|
|
|
-import java.net.http.HttpClient;
|
|
|
-import java.net.http.HttpRequest;
|
|
|
-import java.net.http.HttpResponse;
|
|
|
+import java.util.ArrayList;
|
|
|
import java.util.HashMap;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
|
@@ -38,26 +38,23 @@ public class AppLifecycle implements ApplicationRunner, DisposableBean {
|
|
|
private final FileTask fileTask;
|
|
|
private final SpringProperties springProperties;
|
|
|
private final MachineId machineId;
|
|
|
- private OssConsoleClient ossConsoleClient;
|
|
|
+ private final OssConsoleClient ossConsoleClient;
|
|
|
+ private final StoreLocalCache storeLocalCache;
|
|
|
|
|
|
public AppLifecycle(DataBlockMapper dataBlockMapper, FileTask fileTask, SpringProperties springProperties,
|
|
|
- MachineId machineId, OssConsoleClient ossConsoleClient) {
|
|
|
+ MachineId machineId, OssConsoleClient ossConsoleClient, StoreLocalCache storeLocalCache) {
|
|
|
this.dataBlockMapper = dataBlockMapper;
|
|
|
this.fileTask = fileTask;
|
|
|
this.springProperties = springProperties;
|
|
|
this.machineId = machineId;
|
|
|
this.ossConsoleClient = ossConsoleClient;
|
|
|
+ this.storeLocalCache = storeLocalCache;
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run(ApplicationArguments args) throws Exception {
|
|
|
- try {
|
|
|
- register();
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
initLocalStore();
|
|
|
-
|
|
|
+ initStoreNode();
|
|
|
log.info("执行文件任务...");
|
|
|
fileTask.exec();
|
|
|
log.info("文件任务执行完成...");
|
|
|
@@ -91,35 +88,27 @@ public class AppLifecycle implements ApplicationRunner, DisposableBean {
|
|
|
log.info("本地磁盘数据初始化完成...");
|
|
|
}
|
|
|
|
|
|
- private void register() throws Exception {
|
|
|
- Map<String, String> map = new HashMap<>();
|
|
|
- map.put("ipv4Addr", machineId.ipv4());
|
|
|
- map.put("httpPort", springProperties.getHttpPort()+"");
|
|
|
- map.put("rpcPort", springProperties.getRpcPort()+"");
|
|
|
- String jsonPayload = JsonConverter.objectToJson(map);
|
|
|
+ private void initStoreNode() throws Exception {
|
|
|
+ log.info("初始化 StoreNode...");
|
|
|
+ List<StoreDiskDto> list = new ArrayList<>();
|
|
|
+ for (LocalStore localStore : LocalStores.getLocalStores()) {
|
|
|
+ String diskDir = localStore.getDiskDir();
|
|
|
+ long total = localStore.getTotal();
|
|
|
+ long avail = localStore.getAvailable();
|
|
|
+ list.add(new StoreDiskDto(diskDir, total, avail));
|
|
|
+ }
|
|
|
|
|
|
+ String nodeAddr = machineId.ipv4();
|
|
|
+ int httpPort = springProperties.getHttpPort();
|
|
|
+ int rpcPort = springProperties.getRpcPort();
|
|
|
+ StoreNodeDto storeNodeDto = new StoreNodeDto(nodeAddr, httpPort, rpcPort, list);
|
|
|
+ String jsonPayload = JsonConverter.objectToJson(storeNodeDto);
|
|
|
ossConsoleClient.registerNode(jsonPayload);
|
|
|
|
|
|
- String api = String.format("%s/api/oss/store/register", springProperties.getConsoleEndpoint());
|
|
|
- HttpClient httpClient = HttpClient.newBuilder().build();
|
|
|
- HttpRequest httpRequest = HttpRequest.newBuilder(new URI(api))
|
|
|
- .header("content-type", "application/json")
|
|
|
- .POST(HttpRequest.BodyPublishers.ofString(jsonPayload))
|
|
|
- .build();
|
|
|
- HttpResponse<String> httpResponse = httpClient.send(httpRequest, HttpResponse.BodyHandlers.ofString());
|
|
|
- int statusCode = httpResponse.statusCode();
|
|
|
- String body = httpResponse.body();
|
|
|
- if (statusCode != 200) {
|
|
|
- String errMsg = String.format("%s -> %s", statusCode, body);
|
|
|
- throw new Exception(errMsg);
|
|
|
- }
|
|
|
-
|
|
|
- Type type = new TypeToken<WebResult<String>>(){}.getType();
|
|
|
- WebResult<String> webResult = JsonConverter.jsonToObject(body, type);
|
|
|
- if (webResult.getCode() != 0) {
|
|
|
- String errMsg = String.format("%s - %s", webResult.getCode(), webResult.getMsg());
|
|
|
- throw new Exception(errMsg);
|
|
|
- }
|
|
|
+ StoreProperties storeProperties = ossConsoleClient.getStoreProperties(nodeAddr);
|
|
|
+ List<ObjectChannel> objectChannels = ossConsoleClient.getObjectChannels(nodeAddr);
|
|
|
+ storeLocalCache.initOssStore(storeProperties, objectChannels);
|
|
|
+ log.info("StoreNode 初始化完成...");
|
|
|
}
|
|
|
|
|
|
@Override
|