reghao 5 лет назад
Родитель
Сommit
cc7bc92ffc

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java

@@ -63,8 +63,8 @@ public class RabbitProducer {
      * @date 2021-02-23 下午3:57
      */
     public RpcResult callRemote(String exchange, String routeKey, MqMessage mqMessage) {
-        // TODO 处理 RPC 调用在超时后才成功的情况
-        long timeout = 60_000;
+        // TODO 处理 RPC 调用的写操作在超时后才返回的情况
+        long timeout = 180_000;
         rabbitTemplate.setReplyTimeout(timeout);
         String msg = JsonConverter.objectToJson(mqMessage);
         String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, msg);

+ 2 - 2
common/src/main/java/cn/reghao/autodop/common/dagent/app/api/data/AppStatus.java → common/src/main/java/cn/reghao/autodop/common/dagent/app/api/data/DeployedAppStatus.java

@@ -6,13 +6,13 @@ import java.time.LocalDateTime;
 
 /**
  * 应用运行状态
- * 等价于 cn.reghao.autodop.dmaster.app.entity.log.AppStatus
+ * 等价于 cn.reghao.autodop.dmaster.app.entity.status.DeployedAppStatus
  *
  * @author reghao
  * @date 2021-02-22 16:24:08
  */
 @Data
-public class AppStatus {
+public class DeployedAppStatus {
     private String machineId;
     private String appId;
     private String env;

+ 21 - 21
dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java

@@ -2,7 +2,7 @@ package cn.reghao.autodop.dagent.app;
 
 import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.app.api.data.AppIdArgs;
-import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
+import cn.reghao.autodop.common.dagent.app.api.data.DeployedAppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.PackerType;
 import cn.reghao.autodop.common.dagent.app.api.data.log.AppLogArgs;
@@ -35,22 +35,22 @@ public class App {
         AppDeployArgs appDeployArgs = (AppDeployArgs) JsonConverter.jsonToObject(payload, AppDeployArgs.class);
         String packerType = appDeployArgs.getPackerType();
 
-        AppStatus appStatus;
+        DeployedAppStatus deployedAppStatus;
         switch (PackerType.valueOf(packerType)) {
             case docker:
                 try {
-                    appStatus = dockerAppServiceImpl.deploy(appDeployArgs);
+                    deployedAppStatus = dockerAppServiceImpl.deploy(appDeployArgs);
                 } catch (DockerException e) {
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
                 break;
             case zip:
-                appStatus = zipAppServiceImpl.deploy(appDeployArgs);
+                deployedAppStatus = zipAppServiceImpl.deploy(appDeployArgs);
                 break;
             default:
                 return RpcResult.error("打包类型 " + appDeployArgs.getPackerType() + " 不存在");
         }
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
+        return RpcResult.success(JsonConverter.objectToJson(deployedAppStatus));
     }
 
     public RpcResult log(String payload) {
@@ -89,22 +89,22 @@ public class App {
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
 
-        AppStatus appStatus;
+        DeployedAppStatus deployedAppStatus;
         switch (PackerType.valueOf(packerType)) {
             case docker:
                 try {
-                    appStatus = dockerAppServiceImpl.status(appId);
+                    deployedAppStatus = dockerAppServiceImpl.status(appId);
                 } catch (DockerException e) {
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
                 break;
             case zip:
-                appStatus = zipAppServiceImpl.status(appId);
+                deployedAppStatus = zipAppServiceImpl.status(appId);
                 break;
             default:
                 return RpcResult.error("打包类型 " + appIdArgs.getPackerType() + " 不存在");
         }
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
+        return RpcResult.success(JsonConverter.objectToJson(deployedAppStatus));
     }
 
     public RpcResult restart(String payload) {
@@ -112,22 +112,22 @@ public class App {
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
 
-        AppStatus appStatus;
+        DeployedAppStatus deployedAppStatus;
         switch (PackerType.valueOf(packerType)) {
             case docker:
                 try {
-                    appStatus = dockerAppServiceImpl.restart(appId);
+                    deployedAppStatus = dockerAppServiceImpl.restart(appId);
                 } catch (DockerException e) {
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
                 break;
             case zip:
-                appStatus = zipAppServiceImpl.restart(appId);
+                deployedAppStatus = zipAppServiceImpl.restart(appId);
                 break;
             default:
                 return RpcResult.error("打包类型 " + appIdArgs.getPackerType() + " 不存在");
         }
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
+        return RpcResult.success(JsonConverter.objectToJson(deployedAppStatus));
     }
 
     public RpcResult stop(String payload) {
@@ -135,22 +135,22 @@ public class App {
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
 
-        AppStatus appStatus;
+        DeployedAppStatus deployedAppStatus;
         switch (PackerType.valueOf(packerType)) {
             case docker:
                 try {
-                    appStatus = dockerAppServiceImpl.stop(appId);
+                    deployedAppStatus = dockerAppServiceImpl.stop(appId);
                 } catch (DockerException e) {
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
                 break;
             case zip:
-                appStatus = zipAppServiceImpl.stop(appId);
+                deployedAppStatus = zipAppServiceImpl.stop(appId);
                 break;
             default:
                 return RpcResult.error("打包类型 " + appIdArgs.getPackerType() + " 不存在");
         }
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
+        return RpcResult.success(JsonConverter.objectToJson(deployedAppStatus));
     }
 
     public RpcResult start(String payload) {
@@ -158,21 +158,21 @@ public class App {
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
 
-        AppStatus appStatus;
+        DeployedAppStatus deployedAppStatus;
         switch (PackerType.valueOf(packerType)) {
             case docker:
                 try {
-                    appStatus = dockerAppServiceImpl.start(appId);
+                    deployedAppStatus = dockerAppServiceImpl.start(appId);
                 } catch (DockerException e) {
                     return RpcResult.fail(ExceptionUtil.errorMsg(e));
                 }
                 break;
             case zip:
-                appStatus = zipAppServiceImpl.start(appId);
+                deployedAppStatus = zipAppServiceImpl.start(appId);
                 break;
             default:
                 return RpcResult.error("打包类型 " + appIdArgs.getPackerType() + " 不存在");
         }
-        return RpcResult.success(JsonConverter.objectToJson(appStatus));
+        return RpcResult.success(JsonConverter.objectToJson(deployedAppStatus));
     }
 }

+ 6 - 6
dagent/src/main/java/cn/reghao/autodop/dagent/app/AppService.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
+import cn.reghao.autodop.common.dagent.app.api.data.DeployedAppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.LogFile;
@@ -13,7 +13,7 @@ import java.util.List;
  * @date 2021-02-22 16:21:28
  */
 public interface AppService {
-    AppStatus deploy(AppDeployArgs appDeployArgs) throws DockerException;
+    DeployedAppStatus deploy(AppDeployArgs appDeployArgs) throws DockerException;
     /**
      * 日志文件列表
      *
@@ -31,8 +31,8 @@ public interface AppService {
      * @date 2021-02-26 下午3:26
      */
     List<String> logContent(AppLogArgs appLogArgs) throws DockerException;
-    AppStatus status(String appId) throws DockerException;
-    AppStatus restart(String appId) throws DockerException;
-    AppStatus stop(String appId) throws DockerException;
-    AppStatus start(String appId) throws DockerException;
+    DeployedAppStatus status(String appId) throws DockerException;
+    DeployedAppStatus restart(String appId) throws DockerException;
+    DeployedAppStatus stop(String appId) throws DockerException;
+    DeployedAppStatus start(String appId) throws DockerException;
 }

+ 24 - 19
dagent/src/main/java/cn/reghao/autodop/dagent/app/DockerAppServiceImpl.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
+import cn.reghao.autodop.common.dagent.app.api.data.DeployedAppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.LogConfig;
@@ -10,6 +10,7 @@ import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.dockerc.Docker;
 import cn.reghao.autodop.common.dockerc.exception.DockerException;
 import cn.reghao.autodop.common.dockerc.pojo.Config;
+import cn.reghao.autodop.common.dockerc.pojo.State;
 import cn.reghao.autodop.common.dockerc.pojo.result.ContainerInfo;
 import cn.reghao.autodop.common.utils.DateTimeConverter;
 import cn.reghao.autodop.common.utils.ExceptionUtil;
@@ -22,8 +23,10 @@ import java.util.List;
  * @date 2021-02-22 16:21:37
  */
 public class DockerAppServiceImpl implements AppService {
+    private final long sleep = 10_000;
+
     @Override
-    public AppStatus deploy(AppDeployArgs appDeployArgs) throws DockerException {
+    public DeployedAppStatus deploy(AppDeployArgs appDeployArgs) throws DockerException {
         String appId = appDeployArgs.getAppId();
         String packagePath = appDeployArgs.getPackagePath();
         Config dockerConfig = appDeployArgs.getDockerConfig();
@@ -32,7 +35,7 @@ public class DockerAppServiceImpl implements AppService {
             String repoTag = appDeployArgs.getPackagePath();
             docker.pull(repoTag);
             String containerId = docker.run(appId, packagePath, dockerConfig);
-            Thread.sleep(5_000);
+            Thread.sleep(sleep);
 
             ContainerInfo containerInfo = docker.inspectContainer(containerId);
             return getAppStatus(appId, containerInfo);
@@ -41,19 +44,21 @@ public class DockerAppServiceImpl implements AppService {
         }
     }
 
-    private AppStatus getAppStatus(String appId, ContainerInfo containerInfo) {
-        AppStatus appStatus = new AppStatus();
-        appStatus.setMachineId(Machine.machineId());
-        appStatus.setAppId(appId);
-        boolean isRunning = containerInfo.getState().isRunning();
-        appStatus.setRunning(isRunning);
+    private DeployedAppStatus getAppStatus(String appId, ContainerInfo containerInfo) {
+        DeployedAppStatus deployedAppStatus = new DeployedAppStatus();
+        deployedAppStatus.setMachineId(Machine.machineId());
+        deployedAppStatus.setAppId(appId);
+
+        State state = containerInfo.getState();
+        boolean isRunning = state.isRunning();
+        deployedAppStatus.setRunning(isRunning);
         if (isRunning) {
-            appStatus.setStartTime(DateTimeConverter.localDateTime(containerInfo.getState().getStartedAt()));
-            appStatus.setPid(containerInfo.getState().getPid());
+            deployedAppStatus.setStartTime(DateTimeConverter.localDateTime(state.getStartedAt()));
+            deployedAppStatus.setPid(state.getPid());
         } else {
-            appStatus.setPid(0);
+            deployedAppStatus.setPid(0);
         }
-        return appStatus;
+        return deployedAppStatus;
     }
 
     @Override
@@ -109,7 +114,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus status(String appId) throws DockerException {
+    public DeployedAppStatus status(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             ContainerInfo containerInfo = docker.inspectContainer(containerId);
@@ -118,11 +123,11 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus restart(String appId) throws DockerException {
+    public DeployedAppStatus restart(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.restart(containerId);
-            Thread.sleep(5_000);
+            Thread.sleep(sleep);
 
             ContainerInfo containerInfo = docker.inspectContainer(containerId);
             return getAppStatus(appId, containerInfo);
@@ -132,7 +137,7 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus stop(String appId) throws DockerException {
+    public DeployedAppStatus stop(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.stop(containerId);
@@ -143,11 +148,11 @@ public class DockerAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus start(String appId) throws DockerException {
+    public DeployedAppStatus start(String appId) throws DockerException {
         try (Docker docker = new Docker()) {
             String containerId = docker.getIdByName(appId);
             docker.start(containerId);
-            Thread.sleep(5_000);
+            Thread.sleep(sleep);
 
             ContainerInfo containerInfo = docker.inspectContainer(containerId);
             return getAppStatus(appId, containerInfo);

+ 6 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/app/ZipAppServiceImpl.java

@@ -1,13 +1,12 @@
 package cn.reghao.autodop.dagent.app;
 
-import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
+import cn.reghao.autodop.common.dagent.app.api.data.DeployedAppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.AppLogArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.log.LogFile;
 import cn.reghao.autodop.common.dockerc.exception.DockerException;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /**
@@ -17,7 +16,7 @@ import java.util.List;
 @Slf4j
 public class ZipAppServiceImpl implements AppService {
     @Override
-    public AppStatus deploy(AppDeployArgs appDeployArgs) {
+    public DeployedAppStatus deploy(AppDeployArgs appDeployArgs) {
         log.info("zip 部署...");
         return null;
     }
@@ -33,22 +32,22 @@ public class ZipAppServiceImpl implements AppService {
     }
 
     @Override
-    public AppStatus status(String appId) {
+    public DeployedAppStatus status(String appId) {
         return null;
     }
 
     @Override
-    public AppStatus restart(String appId)  {
+    public DeployedAppStatus restart(String appId)  {
         return null;
     }
 
     @Override
-    public AppStatus stop(String appId)  {
+    public DeployedAppStatus stop(String appId)  {
         return null;
     }
 
     @Override
-    public AppStatus start(String appId) {
+    public DeployedAppStatus start(String appId) {
         return null;
     }
 }

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/StatusController.java

@@ -30,7 +30,7 @@ public class StatusController {
     }
 
     @ApiOperation(value = "刷新已部署应用列表")
-    @GetMapping("/refresh}")
+    @PostMapping("/refresh}")
     public ResponseEntity<String> refreshDeployedApps(@RequestParam("env") String env) {
         statusService.refreshDeployedApps(env);
         return ResponseEntity.ok().body(WebResult.success("ok"));

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/entity/log/DeployLog.java

@@ -19,7 +19,7 @@ import java.time.LocalDateTime;
 public class DeployLog extends BaseDocument {
     private BuildLog buildLog;
     private String machineId;
-    private String machineAddress;
+    private String machineIpv4;
     private LocalDateTime deployTime;
     private long deployTotalTime;
     // 0 - 成功 1 - 失败

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/entity/status/DeployedAppStatus.java

@@ -21,6 +21,8 @@ import java.time.LocalDateTime;
 public class DeployedAppStatus extends BaseEntity {
     @Column(nullable = false)
     private String machineId;
+    @Column(nullable = false)
+    private String machineIpv4;
     @Column(nullable = false, unique = true)
     private String appId;
     @Column(nullable = false)

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -248,6 +248,8 @@ public class AppStatusService {
                 map.put(machineId, deployedAppStatus);
             }
         }
+
+        // TODO 持久化到数据库
         return map;
     }
 }

+ 20 - 6
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/RemoteCallService.java

@@ -38,6 +38,13 @@ public class RemoteCallService {
         this.machineInfoRepository = machineInfoRepository;
     }
 
+    /**
+     * 单个调用
+     *
+     * @param
+     * @return
+     * @date 2021-03-05 下午7:54
+     */
     public RpcResult call(MqMessage mqMessage) {
         Future<RpcResult> future = threadPool.submit(new RemoteCallTask(mqMessage));
         while (!future.isDone() && !future.isCancelled()) {
@@ -53,14 +60,21 @@ public class RemoteCallService {
         try {
             RpcResult rpcResult = future.get();
             if (rpcResult == null) {
-                return RpcResult.fail(machineAddress(machineId) + " RPC 调用失败");
+                return RpcResult.fail(machineIpv4(machineId) + " RPC 调用失败");
             }
             return rpcResult;
         } catch (InterruptedException | ExecutionException e) {
-            return RpcResult.error(machineAddress(machineId) + " " + ExceptionUtil.errorMsg(e));
+            return RpcResult.error(machineIpv4(machineId) + " " + ExceptionUtil.errorMsg(e));
         }
     }
 
+    /**
+     * 多个调用
+     *
+     * @param
+     * @return
+     * @date 2021-03-05 下午7:54
+     */
     public Map<String, RpcResult> call(List<MqMessage> messages) {
         Map<String, Future<RpcResult>> futureMap = new HashMap<>(messages.size());
         messages.forEach(mqMessage ->
@@ -78,24 +92,24 @@ public class RemoteCallService {
                 }
             }
 
-            log.info("RPC 调用返回...");
             String machineId = entry.getKey();
+            log.info(machineIpv4(machineId) + " -> RPC 调用返回");
             try {
                 RpcResult rpcResult = future.get();
                 if (rpcResult != null) {
                     resultMap.put(machineId, rpcResult);
                 } else {
-                    resultMap.put(machineId, RpcResult.fail(machineAddress(machineId) + " RPC 调用失败"));
+                    resultMap.put(machineId, RpcResult.fail(machineIpv4(machineId) + " RPC 调用失败"));
                 }
             } catch (InterruptedException | ExecutionException e) {
                 resultMap.put(machineId,
-                        RpcResult.error(machineAddress(machineId) + " " + ExceptionUtil.errorMsg(e)));
+                        RpcResult.error(machineIpv4(machineId) + " " + ExceptionUtil.errorMsg(e)));
             }
         }
         return resultMap;
     }
 
-    private String machineAddress(String machineId) {
+    private String machineIpv4(String machineId) {
         MachineInfo machineInfo = machineInfoRepository.findByMachineId(machineId);
         List<NetworkInfo> networkInfos = machineInfo.getNetworkInfo();
         return networkInfos.get(0).getIpv4();

+ 1 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/build/tools/packer/DockerPack.java

@@ -27,6 +27,7 @@ public class DockerPack implements CodePacker {
             String image = repo + ":" + commitId;
 
             docker.build(image, appCompileHome, dockerfile);
+            // TODO push 没有报错,但有时 pull 会提示 image 不存在
             docker.push(image);
             log.info("{} 已 push 到仓库...", image);
             return image;

+ 4 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/deploy/AppDeployer.java

@@ -79,7 +79,8 @@ public class AppDeployer {
         rpcResultMap.forEach((machineId, rpcResult) -> {
             DeployLog deployLog = DeployLog.of(buildLog);
             deployLog.setMachineId(machineId);
-            deployLog.setMachineAddress(machineAddress(machineId));
+            String machineIpv4 = machineIpv4(machineId);
+            deployLog.setMachineIpv4(machineIpv4);
             deployLog.setDeployTime(deployTime);
 
             deployLog.setStatusCode(rpcResult.getStatusCode());
@@ -89,6 +90,7 @@ public class AppDeployer {
                 deployedAppStatus.setAppType(buildLog.getAppType());
                 deployedAppStatus.setEnv(buildLog.getEnv());
                 deployedAppStatus.setCommitId(buildLog.getCommitId());
+                deployedAppStatus.setMachineIpv4(machineIpv4);
 
                 logConsumer.addAppStatus(deployedAppStatus);
                 deployLog.setResult(BuildDeployResult.deployDone.getName());
@@ -101,7 +103,7 @@ public class AppDeployer {
         return deployLogs;
     }
 
-    private String machineAddress(String machineId) {
+    private String machineIpv4(String machineId) {
         MachineInfo machineInfo = machineInfoRepository.findByMachineId(machineId);
         List<NetworkInfo> networkInfos = machineInfo.getNetworkInfo();
         if (!networkInfos.isEmpty()) {

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/deploy/DeployNotifyMsg.java

@@ -41,7 +41,7 @@ public class DeployNotifyMsg {
                 .append(System.lineSeparator());
         sb.append("# 部署结果: ").append(System.lineSeparator());
         for (int i = 0; i < deployLogs.size(); i++) {
-            sb.append(i+1).append(".").append(deployLogs.get(i).getMachineAddress()).append(" -> ")
+            sb.append(i+1).append(".").append(deployLogs.get(i).getMachineIpv4()).append(" -> ")
                     .append(deployLogs.get(i).getResult()).append(System.lineSeparator());
         }
         deployNotifyMsg.text = sb.toString();

+ 11 - 2
pom.xml

@@ -30,11 +30,20 @@
         <java.version>1.8</java.version>
     </properties>
 
+    <!--<repositories>
+        <repository>
+            <id>Eclipse Paho Repo</id>
+            <url>https://repo.eclipse.org/content/repositories/paho-releases/</url>
+        </repository>
+    </repositories>-->
+
     <dependencies>
         <!--<dependency>
-            <groupId>cn.reghao</groupId>
-            <artifactId>commons</artifactId>
+            <groupId>org.eclipse.paho</groupId>
+            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
+            <version>1.2.0</version>
         </dependency>-->
+
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-starter-amqp</artifactId>