Explorar o código

将 RPC 调用抽离出来

reghao %!s(int64=5) %!d(string=hai) anos
pai
achega
b66d1e93fb

+ 2 - 0
common/src/main/java/cn/reghao/autodop/common/amqp/RpcResult.java

@@ -7,6 +7,8 @@ import static cn.reghao.autodop.common.result.ResultCode.FAIL;
 import static cn.reghao.autodop.common.result.ResultCode.SUCCESS;
 
 /**
+ * RPC 调用结果
+ *
  * @author reghao
  * @date 2020-09-07 23:04:16
  */

+ 4 - 0
common/src/main/java/cn/reghao/autodop/common/dagent/app/api/data/AppIdArgs.java

@@ -1,6 +1,8 @@
 package cn.reghao.autodop.common.dagent.app.api.data;
 
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 /**
  * 单个 appId 参数,在调用 appStatusOps, appStartOps, appStopOps 以及 appRestartOps 等操作时传入的参数
@@ -8,6 +10,8 @@ import lombok.Data;
  * @author reghao
  * @date 2020-12-25 19:01:15
  */
+@NoArgsConstructor
+@AllArgsConstructor
 @Data
 public class AppIdArgs {
     private String packerType;

+ 0 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java

@@ -13,7 +13,6 @@ import java.util.List;
  */
 public interface AppService {
     AppStatus deploy(AppDeployArgs appDeployArgs) throws DockerException;
-    @Deprecated
     AppStatus status(String appId);
     List<String> log(AppLogArgs appLogArgs);
     void restart(String appId);

+ 10 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/cache/BuildDeployCache.java

@@ -6,6 +6,8 @@ import cn.reghao.autodop.dmaster.app.entity.log.BuildLog;
 import cn.reghao.autodop.dmaster.app.repository.log.BuildLogRepository;
 import cn.reghao.autodop.dmaster.app.repository.orchestration.AppOrchestrationRepository;
 import cn.reghao.autodop.dmaster.app.repository.orchestration.ProjOrchestrationRepository;
+import cn.reghao.autodop.dmaster.cluster.entity.data.MachineInfo;
+import cn.reghao.autodop.dmaster.cluster.repository.MachineInfoRepository;
 import org.springframework.stereotype.Service;
 
 /**
@@ -16,13 +18,16 @@ import org.springframework.stereotype.Service;
  */
 @Service
 public class BuildDeployCache {
+    private MachineInfoRepository machineInfoRepository;
     private AppOrchestrationRepository appRepository;
     private ProjOrchestrationRepository projRepository;
     private BuildLogRepository buildLogRepository;
 
-    public BuildDeployCache(AppOrchestrationRepository appRepository,
+    public BuildDeployCache(MachineInfoRepository machineInfoRepository,
+                            AppOrchestrationRepository appRepository,
                             ProjOrchestrationRepository projRepository,
                             BuildLogRepository buildLogRepository) {
+        this.machineInfoRepository = machineInfoRepository;
         this.appRepository = appRepository;
         this.projRepository = projRepository;
         this.buildLogRepository = buildLogRepository;
@@ -41,4 +46,8 @@ public class BuildDeployCache {
     public BuildLog findByAppIdAndCommitId(String appId, String commitId) {
         return buildLogRepository.findBuildLogByAppIdAndCommitIdAndStatusCode(appId, commitId, 0);
     }
+
+    public MachineInfo findMachineInfoById(String machineId) {
+        return machineInfoRepository.findByMachineId(machineId);
+    }
 }

+ 42 - 4
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -1,13 +1,25 @@
 package cn.reghao.autodop.dmaster.app.service;
 
+import cn.reghao.autodop.common.amqp.MessageType;
+import cn.reghao.autodop.common.amqp.MqMessage;
+import cn.reghao.autodop.common.amqp.RabbitProducer;
+import cn.reghao.autodop.common.amqp.RpcResult;
+import cn.reghao.autodop.common.dagent.app.api.AppOps;
+import cn.reghao.autodop.common.dagent.app.api.data.AppIdArgs;
+import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.cache.BuildDeployCache;
+import cn.reghao.autodop.dmaster.app.entity.config.orchestration.AppOrchestration;
+import cn.reghao.autodop.dmaster.app.entity.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.entity.log.AppStatus;
 import cn.reghao.autodop.dmaster.app.service.crud.log.AppStatusCrudService;
 import cn.reghao.autodop.dmaster.app.vo.LogFile;
 import cn.reghao.autodop.dmaster.common.db.PageList;
 import org.springframework.stereotype.Service;
 
+import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * 应用状态管理
@@ -17,18 +29,44 @@ import java.util.List;
  */
 @Service
 public class AppStatusService {
-    private BuildDeployCache caching;
+    private RemoteCallService callService;
+    private BuildDeployCache cache;
     private AppStatusCrudService statusCrudService;
 
-    public AppStatusService(BuildDeployCache caching, AppStatusCrudService statusCrudService) {
-        this.caching = caching;
-        this.statusCrudService = statusCrudService;
+    public AppStatusService(RemoteCallService callService, BuildDeployCache cache, AppStatusCrudService crudService) {
+        this.callService = callService;
+        this.cache = cache;
+        this.statusCrudService = crudService;
     }
 
     public List<AppStatus> status(int page, int size, String env) {
         PageList<AppStatus> pageList = statusCrudService.getByPage(page, size);
         return null;
     }
+    
+    private AppStatus appStatus(String appId) {
+        AppOrchestration app = cache.findByAppId(appId);
+
+        List<MqMessage> mqMessages = new ArrayList<>();
+        for (DeployConfig deployConfig : app.getDeployConfigs()) {
+            AppIdArgs appIdArgs = new AppIdArgs(app.getBuildConfig().getPackerConfig().getType(), appId);
+
+            MqMessage mqMessage = new MqMessage();
+            mqMessage.setMachineId(deployConfig.getMachineId());
+            mqMessage.setSendTime(System.currentTimeMillis());
+            mqMessage.setRpc(true);
+            mqMessage.setType(MessageType.appType.name());
+            mqMessage.setOps(AppOps.appStatusOps.name());
+            mqMessage.setPayload(JsonConverter.objectToJson(appIdArgs));
+            mqMessages.add(mqMessage);
+        }
+
+        Map<String, RpcResult> rpcResultMap = callService.call(mqMessages);
+        rpcResultMap.forEach((machineId, rpcResult) -> {
+        });
+
+        return null;
+    }
 
     public List<LogFile> logFiles() {
         return null;

+ 10 - 9
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java

@@ -37,19 +37,20 @@ public class BuildDeployDispatcher {
     private Set<String> onDeploying = new ConcurrentSkipListSet<>();
     private Map<String, AppIntegrate> integrateMap = new ConcurrentHashMap<>();
     private BuildDeployCache cache;
-    private AppDeployer appDeployer;
-
-    private ExecutorService threadPool = ThreadPoolWrapper.threadPool("build-deploy");
     private BuildDeployLogConsumer logConsumer;
     private NotifyService notifyService;
+    private AppDeployer appDeployer;
 
-    public BuildDeployDispatcher(BuildDeployCache cache, RabbitProducer producer, BuildDeployLogService logService) {
+    public BuildDeployDispatcher(BuildDeployCache cache, BuildDeployLogConsumer log,
+                                 NotifyService notify, AppDeployer appDeployer) {
         this.cache = cache;
-        this.logConsumer = new BuildDeployLogConsumer(logService);
-        this.appDeployer = new AppDeployer(producer, threadPool, logConsumer);
-        this.notifyService = new NotifyService();
-        threadPool.execute(logConsumer);
-        threadPool.execute(notifyService);
+        this.logConsumer = log;
+        this.notifyService = notify;
+        this.appDeployer = appDeployer;
+
+        ExecutorService threadPool = ThreadPoolWrapper.threadPool("build-deploy");
+        threadPool.execute(log);
+        threadPool.execute(notify);
     }
 
     /**

+ 93 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/RemoteCallService.java

@@ -0,0 +1,93 @@
+package cn.reghao.autodop.dmaster.app.service;
+
+import cn.reghao.autodop.common.amqp.MqMessage;
+import cn.reghao.autodop.common.amqp.RabbitProducer;
+import cn.reghao.autodop.common.amqp.RpcResult;
+import cn.reghao.autodop.common.utils.ExceptionUtil;
+import cn.reghao.autodop.dmaster.cluster.entity.data.MachineInfo;
+import cn.reghao.autodop.dmaster.cluster.entity.data.NetworkInfo;
+import cn.reghao.autodop.dmaster.cluster.repository.MachineInfoRepository;
+import cn.reghao.autodop.dmaster.common.thread.ThreadPoolWrapper;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+/**
+ * 远程调用服务
+ *
+ * @author reghao
+ * @date 2021-02-10 02:31:12
+ */
+@Slf4j
+@Service
+public class RemoteCallService {
+    private ExecutorService threadPool = ThreadPoolWrapper.threadPool("remote-call-service");
+    private final String exchange = "amq.direct";
+    private RabbitProducer rabbitProducer;
+    private MachineInfoRepository machineInfoRepository;
+
+    public RemoteCallService(RabbitProducer rabbitProducer, MachineInfoRepository machineInfoRepository) {
+        this.rabbitProducer = rabbitProducer;
+        this.machineInfoRepository = machineInfoRepository;
+    }
+
+    public Map<String, RpcResult> call(List<MqMessage> messages) {
+        Map<String, Future<RpcResult>> futureMap = new HashMap<>(messages.size());
+        messages.forEach(mqMessage ->
+                futureMap.put(mqMessage.getMachineId(), threadPool.submit(new RemoteCallTask(mqMessage))));
+
+        Map<String, RpcResult> resultMap = new HashMap<>(messages.size());
+        for (Map.Entry<String, Future<RpcResult>> entry : futureMap.entrySet()) {
+            Future<RpcResult> future = entry.getValue();
+            while (!future.isDone() && !future.isCancelled()) {
+                // 休眠等待异步任务结束
+                try {
+                    Thread.sleep(1_000);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+            }
+
+            String machineId = entry.getKey();
+            try {
+                RpcResult rpcResult = future.get();
+                if (rpcResult != null) {
+                    resultMap.put(machineId, rpcResult);
+                } else {
+                    resultMap.put(machineId, RpcResult.fail(machineAddress(machineId) + " RPC 调用失败"));
+                }
+            } catch (InterruptedException | ExecutionException e) {
+                resultMap.put(machineId,
+                        RpcResult.error(machineAddress(machineId) + " " + ExceptionUtil.errorMsg(e)));
+            }
+        }
+        return resultMap;
+    }
+
+    private String machineAddress(String machineId) {
+        MachineInfo machineInfo = machineInfoRepository.findByMachineId(machineId);
+        List<NetworkInfo> networkInfos = machineInfo.getNetworkInfo();
+        return networkInfos.get(0).getIpv4();
+    }
+
+    class RemoteCallTask implements Callable<RpcResult> {
+        private MqMessage mqMessage;
+
+        public RemoteCallTask(MqMessage mqMessage) {
+            this.mqMessage = mqMessage;
+        }
+
+        @Override
+        public RpcResult call() {
+            log.info("RPC 调用...");
+            return rabbitProducer.callRemote(exchange, mqMessage.getMachineId(), mqMessage);
+        }
+    }
+}

+ 4 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/NotifyTask.java → dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/RemoteCallTask.java

@@ -1,9 +1,11 @@
 package cn.reghao.autodop.dmaster.app.service;
 
+import lombok.extern.slf4j.Slf4j;
+
 /**
  * @author reghao
  * @date 2021-02-10 02:31:12
  */
-public class NotifyTask {
-
+@Slf4j
+class RemoteCallTask {
 }

+ 29 - 73
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/deploy/AppDeployer.java

@@ -5,7 +5,6 @@ import cn.reghao.autodop.common.amqp.MessageType;
 import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.app.api.AppOps;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
-import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dockerc.pojo.Config;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.entity.deploy.DeployConfig;
@@ -13,16 +12,13 @@ import cn.reghao.autodop.dmaster.app.entity.log.AppStatus;
 import cn.reghao.autodop.dmaster.app.entity.log.BuildLog;
 import cn.reghao.autodop.dmaster.app.entity.log.DeployLog;
 import cn.reghao.autodop.dmaster.app.entity.log.DeployResult;
-import cn.reghao.autodop.common.utils.ExceptionUtil;
+import cn.reghao.autodop.dmaster.app.service.RemoteCallService;
 import cn.reghao.autodop.dmaster.app.service.log.BuildDeployLogConsumer;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
 import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
 
 /**
  * 应用部署
@@ -31,15 +27,13 @@ import java.util.concurrent.Future;
  * @date 2020-03-13 10:26:22
  */
 @Slf4j
+@Service
 public class AppDeployer {
-    private final String exchange = "amq.direct";
-    private RabbitProducer rabbitProducer;
-    private ExecutorService threadPool;
-    BuildDeployLogConsumer logConsumer;
+    private RemoteCallService callService;
+    private BuildDeployLogConsumer logConsumer;
 
-    public AppDeployer(RabbitProducer rabbitProducer, ExecutorService threadPool, BuildDeployLogConsumer logConsumer) {
-        this.rabbitProducer = rabbitProducer;
-        this.threadPool = threadPool;
+    public AppDeployer(RemoteCallService callService, BuildDeployLogConsumer logConsumer) {
+        this.callService = callService;
         this.logConsumer = logConsumer;
     }
 
@@ -51,7 +45,7 @@ public class AppDeployer {
      * @date 2020-03-13 下午1:00
      */
     public DeployResult deploy(BuildLog buildLog, List<DeployConfig> deployConfigs) {
-        Map<String, Future<RpcResult>> futureMap = new HashMap<>(deployConfigs.size());
+        List<MqMessage> mqMessages = new ArrayList<>();
         for (DeployConfig deployConfig : deployConfigs) {
             AppDeployArgs appDeployArgs = new AppDeployArgs();
             appDeployArgs.setAppId(buildLog.getAppId());
@@ -63,10 +57,17 @@ public class AppDeployer {
             String config = deployConfig.getDockerConfig();
             appDeployArgs.setDockerConfig((Config) JsonConverter.jsonToObject(config, Config.class));
 
-            String machineId = deployConfig.getMachineId();
-            futureMap.put(machineId, threadPool.submit(new DeployTask(machineId, appDeployArgs)));
+            MqMessage mqMessage = new MqMessage();
+            mqMessage.setMachineId(deployConfig.getMachineId());
+            mqMessage.setSendTime(System.currentTimeMillis());
+            mqMessage.setRpc(true);
+            mqMessage.setType(MessageType.appType.name());
+            mqMessage.setOps(AppOps.appDeployOps.name());
+            mqMessage.setPayload(JsonConverter.objectToJson(appDeployArgs));
+            mqMessages.add(mqMessage);
         }
 
+        Map<String, RpcResult> rpcResultMap = callService.call(mqMessages);
         DeployResult deployResult = new DeployResult();
         deployResult.setAppId(buildLog.getAppId());
         deployResult.setEnv(buildLog.getEnv());
@@ -74,69 +75,24 @@ public class AppDeployer {
         LocalDateTime deployTime = LocalDateTime.now();
         deployResult.setDeployTime(deployTime);
         List<DeployLog> deployLogs = new ArrayList<>();
-        for (Map.Entry<String, Future<RpcResult>> entry : futureMap.entrySet()) {
-            Future<RpcResult> future = entry.getValue();
-            while (!future.isDone() && !future.isCancelled()) {
-                // 休眠等待异步任务结束
-                try {
-                    Thread.sleep(1_000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-
+        rpcResultMap.forEach((machineId, rpcResult) -> {
             DeployLog deployLog = new DeployLog();
             deployLog.setAppId(buildLog.getAppId());
             deployLog.setEnv(buildLog.getEnv());
             deployLog.setCommitId(buildLog.getCommitId());
-            deployLog.setMachineId(entry.getKey());
-            try {
-                RpcResult rpcResult = future.get();
-                if (rpcResult != null) {
-                    deployLog.setDeployTime(deployTime);
-                    deployLog.setStatusCode(rpcResult.getStatusCode());
-                    if (rpcResult.getStatusCode() == 0) {
-                        AppStatus appStatus =
-                                (AppStatus) JsonConverter.jsonToObject(rpcResult.getResult(), AppStatus.class);
-                        logConsumer.addAppStatus(appStatus);
-                    } else {
-                        deployLog.setErrDetail(rpcResult.getResult());
-                    }
-                } else {
-                    deployLog.setStatusCode(1);
-                    deployLog.setErrDetail("RPC 调用失败");
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                deployLog.setStatusCode(1);
-                deployLog.setErrDetail(ExceptionUtil.errorMsg(e));
+            deployLog.setMachineId(machineId);
+            deployLog.setDeployTime(deployTime);
+            deployLog.setStatusCode(rpcResult.getStatusCode());
+            if (rpcResult.getStatusCode() == 0) {
+                AppStatus appStatus =
+                        (AppStatus) JsonConverter.jsonToObject(rpcResult.getResult(), AppStatus.class);
+                logConsumer.addAppStatus(appStatus);
+            } else {
+                deployLog.setErrDetail(rpcResult.getResult());
             }
-            deployLogs.add(deployLog);
-        }
+        });
+
         deployResult.setDeployLogs(deployLogs);
         return deployResult;
     }
-
-    class DeployTask implements Callable<RpcResult> {
-        private String machineId;
-        private AppDeployArgs appDeployArgs;
-
-        public DeployTask(String machineId, AppDeployArgs appDeployArgs) {
-            this.machineId = machineId;
-            this.appDeployArgs = appDeployArgs;
-        }
-
-        @Override
-        public RpcResult call() {
-            MqMessage mqMessage = new MqMessage();
-            mqMessage.setMachineId(machineId);
-            mqMessage.setSendTime(System.currentTimeMillis());
-            mqMessage.setRpc(true);
-            mqMessage.setType(MessageType.appType.name());
-            mqMessage.setOps(AppOps.appDeployOps.name());
-            mqMessage.setPayload(JsonConverter.objectToJson(appDeployArgs));
-
-            log.info("RPC 调用部署应用...");
-            return rabbitProducer.callRemote(exchange, machineId, mqMessage);
-        }
-    }
 }

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/log/BuildDeployLogConsumer.java

@@ -2,6 +2,7 @@ package cn.reghao.autodop.dmaster.app.service.log;
 
 import cn.reghao.autodop.dmaster.app.entity.log.*;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -13,6 +14,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  * @date 2020-05-15 21:29:32
  */
 @Slf4j
+@Service
 public class BuildDeployLogConsumer implements Runnable {
     private static BlockingQueue<Object> logQueue = new LinkedBlockingQueue<>();
     private BuildDeployLogService logService;

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/notifier/NotifyService.java

@@ -1,6 +1,7 @@
 package cn.reghao.autodop.dmaster.utils.notifier;
 
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -10,6 +11,7 @@ import java.util.concurrent.LinkedBlockingQueue;
  * @date 2021-02-25 00:10:17
  */
 @Slf4j
+@Service
 public class NotifyService implements Runnable {
     private static BlockingQueue<Object> msgQueue = new LinkedBlockingQueue<>();