Browse Source

优化构建部署模块

reghao 4 years ago
parent
commit
7b0f2ad4ee
21 changed files with 360 additions and 425 deletions
  1. 0 12
      common/pom.xml
  2. 1 16
      common/src/main/java/cn/reghao/autodop/common/log/Appenders.java
  3. 3 13
      common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java
  4. 46 4
      common/src/main/java/cn/reghao/autodop/common/mqtt/AsyncMqttClient.java
  5. 1 0
      common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java
  6. 2 0
      common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java
  7. 2 2
      dagent/src/main/java/cn/reghao/autodop/dagent/DagentApp.java
  8. 0 40
      dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java
  9. 44 27
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/BuildDeployController.java
  10. 0 37
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/WebhookController.java
  11. 5 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppDeployer.java
  12. 0 194
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java
  13. 6 2
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployService.java
  14. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/impl/AppStatusServiceImpl.java
  15. 167 38
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/impl/BuildDeployServiceImpl.java
  16. 44 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterConnActionListener.java
  17. 4 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterTopicListener.java
  18. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/BeansConfig.java
  19. 16 24
      dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java
  20. 1 0
      dmaster/src/main/resources/templates/app/bd/index.html
  21. 12 0
      pom.xml

+ 0 - 12
common/pom.xml

@@ -65,17 +65,5 @@
             <artifactId>oshi-core</artifactId>
             <version>5.8.2</version>
         </dependency>
-
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-core</artifactId>
-            <version>1.2.3</version>
-        </dependency>
-
-        <dependency>
-            <groupId>ch.qos.logback</groupId>
-            <artifactId>logback-classic</artifactId>
-            <version>1.2.3</version>
-        </dependency>
     </dependencies>
 </project>

+ 1 - 16
common/src/main/java/cn/reghao/autodop/common/log/Appenders.java

@@ -7,7 +7,6 @@ import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.ConsoleAppender;
 import ch.qos.logback.core.FileAppender;
 import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import org.slf4j.LoggerFactory;
 
 /**
@@ -17,21 +16,7 @@ import org.slf4j.LoggerFactory;
 public class Appenders {
     private static final LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
 
-    public static Appender<ILoggingEvent> mqttAppender1(String appId, AsyncMqttClient asyncMqttClient) {
-        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
-        layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
-        layoutEncoder.setContext(loggerContext);
-        layoutEncoder.start();
-
-        MqttAppender mqttAppender = new MqttAppender(appId, null);
-        mqttAppender.setAsyncMqttClient(asyncMqttClient);
-
-        mqttAppender.setContext(loggerContext);
-        mqttAppender.start();
-        return mqttAppender;
-    }
-
-    public static Appender<ILoggingEvent> mqttAppender(String appId, DefaultMqttClient mqttClient) {
+    public static Appender<ILoggingEvent> mqttAppender1(String appId, AsyncMqttClient mqttClient) {
         PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
         layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
         layoutEncoder.setContext(loggerContext);

+ 3 - 13
common/src/main/java/cn/reghao/autodop/common/log/MqttAppender.java

@@ -4,7 +4,6 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.UnsynchronizedAppenderBase;
 import cn.reghao.autodop.common.machine.Machine;
 import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
@@ -22,19 +21,14 @@ import org.eclipse.paho.client.mqttv3.MqttException;
 public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
     private final String appId;
     private final String topic;
-    private final DefaultMqttClient mqttClient;
-    private AsyncMqttClient asyncMqttClient;
+    private final AsyncMqttClient mqttClient;
 
-    public MqttAppender(String appId, DefaultMqttClient mqttClient) {
+    public MqttAppender(String appId, AsyncMqttClient mqttClient) {
         this.appId = appId;
         this.topic = MsgQueue.dmasterTopic();
         this.mqttClient = mqttClient;
     }
 
-    public void setAsyncMqttClient(AsyncMqttClient asyncMqttClient) {
-        this.asyncMqttClient = asyncMqttClient;
-    }
-
     @Override
     public void start() {
         super.start();
@@ -52,11 +46,7 @@ public class MqttAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
         PubMsg pubMsg = PubMsg.pubMsg(NodeEventPubClazz.class.getSimpleName(), NodeEventPubClazz.log.name(), jsonPayload);
         Message message = Message.pubMsg(pubMsg);
         try {
-            if (asyncMqttClient != null) {
-                asyncMqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
-            } else {
-                mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
-            }
+            mqttClient.pub(topic, 1, JsonConverter.objectToJson(message));
         } catch (MqttException e) {
             e.printStackTrace();
             LogCache.put(message);

+ 46 - 4
common/src/main/java/cn/reghao/autodop/common/mqtt/AsyncMqttClient.java

@@ -6,6 +6,7 @@ import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
 import cn.reghao.autodop.common.msg.pub.constant.NodeEventPubClazz;
+import cn.reghao.autodop.common.msg.rpc.RpcMsg;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
@@ -14,8 +15,11 @@ import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
+ * TODO 采用 Builder 模式初始化参数
+ *
  * @author reghao
  * @date 2021-10-28 20:52:17
  */
@@ -24,6 +28,7 @@ public class AsyncMqttClient {
     private final MqttProperties properties;
     private final MqttAsyncClient mqttClient;
     private final Map<String, IMqttMessageListener> msgListeners = new HashMap<>();
+    private final Map<String, RpcMsg> rpcRecorder = new ConcurrentHashMap<>();
 
     public AsyncMqttClient(MqttProperties properties) throws MqttException {
         this.properties = properties;
@@ -46,6 +51,13 @@ public class AsyncMqttClient {
         return options;
     }
 
+    /**
+     * LWT 消息
+     *
+     * @param
+     * @return
+     * @date 2021-11-04 下午3:57
+     */
     private byte[] lastWillMessage() {
         String jsonPayload = String.format("{\"machineId\":\"%s\"}", Machine.ID);
         PubMsg pubMsg = PubMsg.pubMsg(NodeEventPubClazz.class.getSimpleName(),
@@ -62,9 +74,24 @@ public class AsyncMqttClient {
         return msgListeners;
     }
 
-    public void connect(IMqttActionListener mqttActionListener) throws MqttException {
+    public RpcMsg getAndRemoveRecord(String msgId) {
+        return rpcRecorder.remove(msgId);
+    }
+
+    /**
+     * 没有返回结果的 RPC 调用
+     *
+     * @param
+     * @return
+     * @date 2021-11-04 上午11:33
+     */
+    public Map<String, RpcMsg> unreturnedRpc() {
+        return rpcRecorder;
+    }
+
+    public void connect(IMqttActionListener actionListener) throws MqttException {
         mqttClient.setCallback(new ConnectionCallback());
-        mqttClient.connect(connectOptions(), null, mqttActionListener);
+        mqttClient.connect(connectOptions(), null, actionListener);
     }
 
     public boolean isConnected() {
@@ -86,6 +113,22 @@ public class AsyncMqttClient {
         mqttClient.publish(topic, message);
     }
 
+    /**
+     * pub 消息后,等 sub 端处理后在 pub 结果
+     *
+     * @param
+     * @return
+     * @date 2021-06-30 上午11:22
+     */
+    public void pubWithResult(String topic, int qos, Message message) throws MqttException {
+        MqttMessage mqttMessage = new MqttMessage();
+        mqttMessage.setQos(qos);
+        mqttMessage.setPayload(JsonConverter.objectToJson(message).getBytes());
+
+        mqttClient.publish(topic, mqttMessage);
+        rpcRecorder.put(message.getMsgId(), message.getRpcMsg());
+    }
+
     public void sub(String topic, IMqttMessageListener messageListener) throws MqttException {
         mqttClient.subscribe(topic, 2, messageListener);
     }
@@ -136,8 +179,7 @@ public class AsyncMqttClient {
         }
 
         @Override
-        public void messageArrived(String topic, MqttMessage message) throws Exception {
-            System.out.println();
+        public void messageArrived(String topic, MqttMessage message) {
         }
 
         @Override

+ 1 - 0
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -15,6 +15,7 @@ import java.util.Map;
  * @author reghao
  * @date 2021-05-21 08:27:41
  */
+@Deprecated
 @Slf4j
 public class DefaultMqttClient implements AutoCloseable {
     private final MqttProperties properties;

+ 2 - 0
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java

@@ -14,10 +14,12 @@ import static cn.reghao.jdkutil.result.ResultStatus.*;
 public class RpcMsg implements Serializable {
     private static final long serialVersionUID = 1L;
 
+    // 调用参数
     private String clazz;
     private String method;
     private String jsonParam;
 
+    // 返回结果
     // 0 - 成功 1 - 失败
     private int code;
     private String msg;

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/DagentApp.java

@@ -87,12 +87,12 @@ public class DagentApp {
 		IAppRpcClazz appRpcClazz = new AppRpcClazzImpl();
 		AppRpcClazzDispatcher appRpcClazzDispatcher = new AppRpcClazzDispatcher(appRpcClazz);
 		dagentTopicListener = new DagentTopicListener(mqttClient, appRpcClazzDispatcher);
+		mqttClient.add(MsgQueue.dagentTopic(Machine.ID), dagentTopicListener);
 
 		Machine machine = new Machine(new WebClient(), new SystemInfo());
 		nodeEventClazzPub = new NodeEventClazzPubImpl(mqttClient, machine);
-
-		mqttClient.add(MsgQueue.dagentTopic(Machine.ID), dagentTopicListener);
 		DagentConnActionListener connActionListener = new DagentConnActionListener(mqttClient, nodeEventClazzPub);
+
 		mqttClient.connect(connActionListener);
 		shutdownGracefully();
 	}

+ 0 - 40
dagent/src/main/java/cn/reghao/autodop/dagent/spring/DagentLifecycle.java

@@ -1,40 +0,0 @@
-package cn.reghao.autodop.dagent.spring;
-
-import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
-import cn.reghao.autodop.common.msg.MsgQueue;
-import cn.reghao.autodop.dagent.machine.NodeEventClazzPubImpl;
-import cn.reghao.autodop.dagent.mqttsub.DagentConnActionListener;
-import cn.reghao.autodop.dagent.mqttsub.DagentTopicListener;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
-
-/**
- * TODO 无法处理 kill -9 和机器断电的情况
- *
- * @author reghao
- * @date 2021-04-01 11:36:51
- */
-@Slf4j
-public class DagentLifecycle {
-    private final AsyncMqttClient mqttClient;
-    private final DagentTopicListener dagentTopicListener;
-    private final NodeEventClazzPubImpl nodeEventClazzPub;
-
-    public DagentLifecycle(AsyncMqttClient mqttClient, DagentTopicListener dagentTopicListener, Machine machine) {
-        this.mqttClient = mqttClient;
-        this.dagentTopicListener = dagentTopicListener;
-        this.nodeEventClazzPub = new NodeEventClazzPubImpl(mqttClient, machine);
-    }
-
-    private void startMqtt() throws MqttException {
-        mqttClient.add(MsgQueue.dagentTopic(Machine.ID), dagentTopicListener);
-        DagentConnActionListener connActionListener = new DagentConnActionListener(mqttClient, nodeEventClazzPub);
-        mqttClient.connect(connActionListener);
-    }
-
-    private void pubDagentShutdown() {
-        nodeEventClazzPub.nodeShutdown();
-        log.info("Dagent 停止");
-    }
-}

+ 44 - 27
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/BuildDeployController.java

@@ -4,13 +4,15 @@ import cn.reghao.autodop.dmaster.app.model.po.config.build.LocalBuildDir;
 import cn.reghao.autodop.dmaster.app.service.BuildDeployService;
 import cn.reghao.autodop.dmaster.util.UploadDownload;
 import cn.reghao.jdkutil.result.WebBody;
-import cn.reghao.autodop.dmaster.app.service.BuildDeployDispatcher;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import com.google.gson.JsonObject;
 import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiImplicitParam;
 import io.swagger.annotations.ApiImplicitParams;
 import io.swagger.annotations.ApiOperation;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.*;
 
 import javax.servlet.http.HttpServletResponse;
@@ -28,12 +30,11 @@ import java.io.IOException;
 @RestController
 @RequestMapping("/api/app/bd")
 public class BuildDeployController {
-    private BuildDeployService buildDeployService;
-    private final BuildDeployDispatcher buildDeployDispatcher;
+    private final BuildDeployService buildDeployService;
     private final UploadDownload uploadDownload;
 
-    public BuildDeployController(BuildDeployDispatcher buildDeployDispatcher, UploadDownload uploadDownload) {
-        this.buildDeployDispatcher = buildDeployDispatcher;
+    public BuildDeployController(BuildDeployService buildDeployService, UploadDownload uploadDownload) {
+        this.buildDeployService = buildDeployService;
         this.uploadDownload = uploadDownload;
     }
 
@@ -41,8 +42,7 @@ public class BuildDeployController {
     @ApiImplicitParams(@ApiImplicitParam(name="appId", value="应用 ID", paramType="query", dataType = "String"))
     @PostMapping(value = "/update", produces = MediaType.APPLICATION_JSON_VALUE)
     public String buildAndDeploy(@RequestParam("appId") String appId) throws Exception {
-        //buildDeployService.buildAndDeploy(appId);
-        String ret = buildDeployDispatcher.buildAndDeploy(appId, true);
+        String ret = buildDeployService.buildAndDeploy(appId);
         return WebBody.successWithMsg(ret);
     }
 
@@ -50,23 +50,30 @@ public class BuildDeployController {
     @ApiImplicitParams(@ApiImplicitParam(name="appId", value="应用 ID", paramType="query", dataType = "String"))
     @PostMapping(value = "/build", produces = MediaType.APPLICATION_JSON_VALUE)
     public String build(@RequestParam("appId") String appId) throws Exception {
-        //buildDeployService.build(appId);
-        String ret = buildDeployDispatcher.buildAndDeploy(appId, false);
+        String ret = buildDeployService.build(appId);
         return WebBody.successWithMsg(ret);
     }
 
     @ApiOperation(value = "部署应用")
     @ApiImplicitParams({
-            @ApiImplicitParam(name="appId", value="应用 ID", paramType="query", dataType = "String"),
-            @ApiImplicitParam(name="buildLogId", value="构建 ID", paramType="query", dataType = "String")
+            @ApiImplicitParam(name="buildLogId", value="构建 ID", paramType="path", dataType = "String")
     })
-    @PostMapping(value = "/deploy", produces = MediaType.APPLICATION_JSON_VALUE)
-    public String deploy(@RequestParam("appId") String appId, @RequestParam("buildLogId") String buildLogId) {
-        //buildDeployService.deploy(appId, commitId);
-        //buildDeployDispatcher.deploy(appId, commitId);
+    @PostMapping(value = "/deploy/{buildLogId}", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String deploy(@PathVariable("buildLogId") String buildLogId) {
+        buildDeployService.deploy(buildLogId);
         return WebBody.success();
     }
 
+    @ApiOperation(value = "webhook 自动构建部署")
+    @PostMapping("/hook")
+    public String hook(@RequestBody String body) {
+        JsonObject jsonObject = JsonConverter.jsonToJsonElement(body).getAsJsonObject();
+        String repo = jsonObject.get("repository").getAsJsonObject().get("url").getAsString();
+        String ref = jsonObject.get("ref").getAsString();
+        String branch = ref.substring(ref.lastIndexOf("/")+1);
+        return WebBody.success("暂未实现");
+    }
+
     @Deprecated
     @ApiOperation(value = "刷新应用构建, 部署和运行列表")
     @PostMapping(value = "/refresh", produces = MediaType.APPLICATION_JSON_VALUE)
@@ -74,25 +81,35 @@ public class BuildDeployController {
         return WebBody.success();
     }
 
-    @ApiOperation(value = "当前正在构建和部署的应用")
-    @GetMapping(value = "/current", produces = MediaType.APPLICATION_JSON_VALUE)
-    public String currentBuildingOrDeploy() {
-        return WebBody.success(buildDeployDispatcher.current());
-    }
-
     @ApiOperation(value = "清除当前正在构建和部署的应用")
     @PostMapping(value = "/clear", produces = MediaType.APPLICATION_JSON_VALUE)
     public String clearCurrentBuildingOrDeploy() {
-        buildDeployDispatcher.clear();
-        return WebBody.success();
+        return WebBody.success("暂未实现");
     }
 
     @ApiOperation(value = "构建包下载接口")
-    @GetMapping("/dl/{filename}")
-    public void dlPackage(@PathVariable("filename") String filename, HttpServletResponse response) throws IOException {
-        String filePath = LocalBuildDir.packDir + File.separator + filename;
+    @GetMapping("/dl/{buildLogId}")
+    public ResponseEntity dlPackage(@PathVariable("buildLogId") String buildLogId, HttpServletResponse response)
+            throws IOException {
+        buildDeployService.downloadPackage(buildLogId);
+        String filePath = LocalBuildDir.packDir + File.separator + "filename";
         FileInputStream fileInputStream = new FileInputStream(filePath);
         BufferedInputStream bis = new BufferedInputStream(fileInputStream);
-        uploadDownload.download(filename, bis, response);
+        uploadDownload.download("filename", bis, response);
+        return null;
+    }
+
+    @ApiOperation(value = "当前正在构建的应用")
+    @GetMapping(value = "/building", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String currentlyBuilding() {
+        buildDeployService.currentlyBuilding();
+        return WebBody.success("暂未实现");
+    }
+
+    @ApiOperation(value = "当前正在部署的应用")
+    @GetMapping(value = "/deploying", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String currentlyDeploying() {
+        buildDeployService.currentlyDeploying();
+        return WebBody.success("暂未实现");
     }
 }

+ 0 - 37
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/WebhookController.java

@@ -1,37 +0,0 @@
-package cn.reghao.autodop.dmaster.app.controller;
-
-import cn.reghao.jdkutil.result.WebBody;
-import cn.reghao.jdkutil.serializer.JsonConverter;
-import cn.reghao.autodop.dmaster.app.service.BuildDeployDispatcher;
-import com.google.gson.JsonObject;
-import io.swagger.annotations.Api;
-import io.swagger.annotations.ApiOperation;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.*;
-
-/**
- * @author reghao
- * @date 2019-08-30 18:49:15
- */
-@Slf4j
-@RestController
-@Api(tags = "Webhook 接口")
-@RequestMapping("/api/hook")
-public class WebhookController {
-    private BuildDeployDispatcher buildDeployDispatcher;
-
-    public WebhookController(BuildDeployDispatcher buildDeployDispatcher) {
-        this.buildDeployDispatcher = buildDeployDispatcher;
-    }
-
-    @ApiOperation(value = "通过 webhook 自动部署应用")
-    @PostMapping("/hook")
-    public String hook(@RequestBody String body) {
-        JsonObject jsonObject = JsonConverter.jsonToJsonElement(body).getAsJsonObject();
-        String repo = jsonObject.get("repository").getAsJsonObject().get("url").getAsString();
-        String ref = jsonObject.get("ref").getAsString();
-        String branch = ref.substring(ref.lastIndexOf("/")+1);
-
-        return WebBody.success("buildDeployResult");
-    }
-}

+ 5 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppDeployer.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service;
 
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.rpc.RpcMsg;
@@ -30,13 +30,13 @@ import java.util.*;
 @Slf4j
 @Service
 public class AppDeployer {
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
     private final AppDeployConfigQuery deployConfigQuery;
-    private AppDeployingQuery deployingQuery;
-    private AppDeployingCrud deployingCrud;
+    private final AppDeployingQuery deployingQuery;
+    private final AppDeployingCrud deployingCrud;
     private final DeployLogCrud logCrud;
 
-    public AppDeployer(DefaultMqttClient mqttClient, AppDeployConfigQuery deployConfigQuery,
+    public AppDeployer(AsyncMqttClient mqttClient, AppDeployConfigQuery deployConfigQuery,
                        AppDeployingQuery deployingQuery, AppDeployingCrud deployingCrud,
                        DeployLogCrud logCrud) {
         this.mqttClient = mqttClient;

+ 0 - 194
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java

@@ -1,194 +0,0 @@
-package cn.reghao.autodop.dmaster.app.service;
-
-import cn.reghao.autodop.dmaster.app.db.crud.AppBuildingCrud;
-import cn.reghao.autodop.dmaster.app.db.crud.AppDeployingCrud;
-import cn.reghao.autodop.dmaster.app.db.query.AppBuildingQuery;
-import cn.reghao.autodop.dmaster.app.db.query.AppDeployingQuery;
-import cn.reghao.jdkutil.result.ResultStatus;
-import cn.reghao.autodop.dmaster.app.db.query.config.AppConfigQuery;
-import cn.reghao.autodop.dmaster.app.model.po.AppBuilding;
-import cn.reghao.autodop.dmaster.app.model.po.AppDeploying;
-import cn.reghao.autodop.dmaster.app.model.po.log.*;
-import cn.reghao.autodop.dmaster.spring.interceptor.AppIntegrateReinitInterceptor;
-import cn.reghao.autodop.dmaster.app.db.repository.log.BuildLogRepository;
-import cn.reghao.autodop.dmaster.app.model.po.config.AppConfig;
-import cn.reghao.autodop.dmaster.notification.model.vo.BuildNotifyMsg;
-import cn.reghao.autodop.dmaster.notification.model.po.NotifyGroup;
-import cn.reghao.autodop.dmaster.notification.model.po.NotifyType;
-import cn.reghao.autodop.dmaster.notification.service.NotifyService;
-import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
-import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.springframework.stereotype.Service;
-
-import java.util.*;
-import java.util.concurrent.*;
-
-/**
- * 应用构建部署分发器
- * 多个线程访问同一个 BuildDeployDispatcher 对象
- *
- * @author reghao
- * @date 2019-11-12 17:20:27
- */
-@Slf4j
-@Service
-public class BuildDeployDispatcher {
-    private final AppBuildingQuery appBuildingQuery;
-    private final AppBuildingCrud appBuildingCrud;
-    private final AppDeployingQuery appDeployingQuery;
-    private final AppDeployingCrud appDeployingCrud;
-    private final BuildLogRepository buildLogRepository;
-    private AppIntegrateReinitInterceptor reinitInterceptor;
-    // TODO 多线程访问的集合
-    private Set<String> onBuilding = new ConcurrentSkipListSet<>();
-    private Set<String> onDeploying = new ConcurrentSkipListSet<>();
-    private Map<String, AppBuilder> integrateMap = new ConcurrentHashMap<>();
-    private final AppConfigQuery appConfigQuery;
-    private final ExecutorService threadPool;
-    private final AppDeployer appDeployer;
-    private final NotifyService notifyService;
-
-    public BuildDeployDispatcher(AppBuildingQuery appBuildingQuery, AppBuildingCrud appBuildingCrud,
-                                 AppDeployingQuery appDeployingQuery, AppDeployingCrud appDeployingCrud,
-                                 AppIntegrateReinitInterceptor reinitInterceptor,
-                                 AppConfigQuery appConfigQuery,
-                                 BuildLogRepository buildLogRepository,
-                                 NotifyService notify,
-                                 AppDeployer appDeployer) {
-        this.appBuildingQuery = appBuildingQuery;
-        this.appBuildingCrud = appBuildingCrud;
-        this.appDeployingQuery = appDeployingQuery;
-        this.appDeployingCrud = appDeployingCrud;
-        this.reinitInterceptor = reinitInterceptor;
-        this.appConfigQuery = appConfigQuery;
-        this.buildLogRepository = buildLogRepository;
-        this.notifyService = notify;
-        this.appDeployer = appDeployer;
-        this.threadPool = ThreadPoolWrapper.threadPool("build-deploy");
-    }
-
-    /**
-     * 构建部署应用
-     *
-     * @param
-     * @return
-     * @date 2021-02-06 上午1:46
-     */
-    public String buildAndDeploy(String appId, boolean isDeploy) throws Exception {
-        if (!onBuilding.add(appId)) {
-            return appId + " 正在构建中";
-        }
-
-        /* TODO 后面再使用缓存
-        AppIntegrate appIntegrate = integrateMap.get(appId);
-        if (appIntegrate != null) {
-            if (reinitInterceptor.isAppChanged(appId)) {
-                AppOrchestration app = appQuery.findByAppId(appId);
-                appIntegrate.reInit(app);
-            }
-        } else {
-            integrateMap.put(appId, appIntegrate);
-        }*/
-
-        AppConfig app = appConfigQuery.findByAppId(appId);
-        if (app == null) {
-            onBuilding.remove(appId);
-            throw new Exception(appId + " 不存在");
-        }
-        AppBuilder appIntegrate = new AppBuilder(app);
-
-        AppBuildSupplier supplier = new AppBuildSupplier(appIntegrate);
-        //List<NotifyGroup> notifyGroups = app.getNotifyGroups();
-        log.info("开始异步构建 {}", appId);
-        CompletableFuture.supplyAsync(supplier, threadPool)
-                .whenComplete((buildLog, throwable) -> {
-                    onBuilding.remove(appId);
-                    if (throwable != null) {
-                        String msg = throwable.getMessage();
-                        return;
-                    }
-
-                    buildLog = buildLogRepository.save(buildLog);
-                    AppBuilding appBuilding = appBuildingQuery.findByAppId(appId);
-                    if (appBuilding != null) {
-                        appBuilding.update(buildLog);
-                        appBuildingCrud.save(appBuilding);
-                    }
-
-                    List<AppDeploying> appDeployings = appDeployingQuery.findByAppId(appId);
-                    if (!appDeployings.isEmpty()) {
-                        for (AppDeploying appDeploying : appDeployings) {
-                            appDeploying.setBuildLogId(buildLog.getId());
-                        }
-                        appDeployingCrud.saveAll(appDeployings);
-                    }
-                    log.info("{} 构建完成", appId);
-                })
-                .thenAccept(buildLog -> {
-                    if (!isDeploy) {
-                        // TODO 抛出异常时不会有任何错误
-                        //notifyGroups.forEach(notifyGroup -> buildNotify(notifyGroup, buildLog));
-                    }
-
-                    if (buildLog.getResult().getCode() == ResultStatus.SUCCESS.getCode() && isDeploy) {
-                        log.info("开始部署 {}", appId);
-                        try {
-                            appDeployer.deploy(buildLog);
-                        } catch (MqttException e) {
-                            e.printStackTrace();
-                        }
-                    }
-                });
-        return appId + " 已开始构建";
-    }
-
-    private void buildNotify(NotifyGroup notifyGroup, BuildLog buildLog) {
-        String notifyType = notifyGroup.getNotifyType();
-        switch (NotifyType.valueOf(notifyType)) {
-            case ding:
-                DingMsg dingMsg = new BuildNotifyMsg(buildLog).dingMsg();
-                notifyService.notify(notifyGroup, dingMsg);
-                break;
-            case email:
-                break;
-            case sms:
-                break;
-            default:
-                log.error("通知类型不存在");
-        }
-    }
-
-    /**
-     * 部署应用的最新版本
-     *
-     * @param
-     * @return
-     * @date 2021-02-06 上午2:02
-     */
-    private void deployApp(BuildLog buildLog) {
-    }
-
-    /**
-     * 部署应用
-     *
-     * @param
-     * @return
-     * @date 2021-02-25 下午10:35
-     */
-    public void deploy(String appId, String commitId) {
-    }
-
-    public Map<String, List<String>> current() {
-        Map<String, List<String>> map = new HashMap<>();
-        map.put("正在构建", new ArrayList<>(onBuilding));
-        map.put("正在部署", new ArrayList<>(onDeploying));
-        return map;
-    }
-
-    public void clear() {
-        onBuilding.clear();
-        onDeploying.clear();
-    }
-}

+ 6 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployService.java

@@ -1,5 +1,7 @@
 package cn.reghao.autodop.dmaster.app.service;
 
+import java.util.List;
+
 /**
  * @author reghao
  * @date 2021-09-17 11:30:16
@@ -7,6 +9,8 @@ package cn.reghao.autodop.dmaster.app.service;
 public interface BuildDeployService {
     String buildAndDeploy(String appId);
     String build(String appId);
-    String build(String appId, String commitId);
-    boolean deploy(String appId, String commitId);
+    boolean deploy(String buildLogId);
+    void downloadPackage(String buildLogId);
+    List<String> currentlyBuilding();
+    List<String> currentlyDeploying();
 }

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/impl/AppStatusServiceImpl.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service.impl;
 
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.common.msg.rpc.RpcMsg;
@@ -22,10 +22,10 @@ import org.springframework.stereotype.Service;
  */
 @Service
 public class AppStatusServiceImpl implements AppStatusService {
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
     private final AppConfigQuery appConfigQuery;
 
-    public AppStatusServiceImpl(DefaultMqttClient mqttClient, AppConfigQuery appConfigQuery) {
+    public AppStatusServiceImpl(AsyncMqttClient mqttClient, AppConfigQuery appConfigQuery) {
         this.mqttClient = mqttClient;
         this.appConfigQuery = appConfigQuery;
     }

+ 167 - 38
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/impl/BuildDeployServiceImpl.java

@@ -1,76 +1,205 @@
 package cn.reghao.autodop.dmaster.app.service.impl;
 
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
+import cn.reghao.autodop.common.msg.rpc.constant.AppRpcClazz;
+import cn.reghao.autodop.common.msg.rpc.dto.app.DeployParam;
 import cn.reghao.autodop.common.util.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.dmaster.app.db.crud.AppBuildingCrud;
+import cn.reghao.autodop.dmaster.app.db.crud.AppDeployingCrud;
+import cn.reghao.autodop.dmaster.app.db.crud.log.BuildLogCrud;
+import cn.reghao.autodop.dmaster.app.db.query.AppBuildingQuery;
+import cn.reghao.autodop.dmaster.app.db.query.AppDeployingQuery;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppConfigQuery;
-import cn.reghao.autodop.dmaster.app.db.repository.AppBuildingRepository;
-import cn.reghao.autodop.dmaster.app.db.repository.AppDeployingRepository;
-import cn.reghao.autodop.dmaster.app.db.repository.log.BuildLogRepository;
+import cn.reghao.autodop.dmaster.app.db.query.log.BuildLogQuery;
+import cn.reghao.autodop.dmaster.app.model.po.AppBuilding;
+import cn.reghao.autodop.dmaster.app.model.po.AppDeploying;
+import cn.reghao.autodop.dmaster.app.model.po.config.AppConfig;
+import cn.reghao.autodop.dmaster.app.model.po.log.BuildLog;
+import cn.reghao.autodop.dmaster.app.service.AppBuildSupplier;
 import cn.reghao.autodop.dmaster.app.service.BuildDeployService;
 import cn.reghao.autodop.dmaster.app.service.AppDeployer;
 import cn.reghao.autodop.dmaster.app.service.AppBuilder;
 import cn.reghao.autodop.dmaster.notification.service.NotifyService;
 import cn.reghao.autodop.dmaster.spring.interceptor.AppIntegrateReinitInterceptor;
+import cn.reghao.jdkutil.result.ResultStatus;
+import cn.reghao.jdkutil.serializer.JsonConverter;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Service;
 
-import java.util.Map;
+import java.util.Collections;
+import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
 
 /**
  * @author reghao
  * @date 2021-09-17 11:30:16
  */
+@Slf4j
 @Service
 public class BuildDeployServiceImpl implements BuildDeployService {
-    private AppBuildingRepository appBuildingRepository;
-    private AppDeployingRepository appDeployingRepository;
-    private BuildLogRepository buildLogRepository;
-    private AppIntegrateReinitInterceptor reinitInterceptor;
-    // TODO 多线程访问的集合
-    private Set<String> onBuilding = new ConcurrentSkipListSet<>();
-    private Set<String> onDeploying = new ConcurrentSkipListSet<>();
-    private Map<String, AppBuilder> integrateMap = new ConcurrentHashMap<>();
-    private AppConfigQuery appConfigQuery;
-    private ExecutorService threadPool;
-    private AppDeployer appDeployer;
-    private NotifyService notifyService;
-
-    public BuildDeployServiceImpl(AppBuildingRepository appBuildingRepository,
-                                 AppDeployingRepository appDeployingRepository,
-                                 AppIntegrateReinitInterceptor reinitInterceptor,
-                                 AppConfigQuery appConfigQuery,
-                                 BuildLogRepository buildLogRepository,
-                                 NotifyService notify,
-                                 AppDeployer appDeployer) {
-        this.appBuildingRepository = appBuildingRepository;
-        this.appDeployingRepository = appDeployingRepository;
-        this.reinitInterceptor = reinitInterceptor;
-        this.appConfigQuery = appConfigQuery;
-        this.buildLogRepository = buildLogRepository;
-        this.notifyService = notify;
-        this.appDeployer = appDeployer;
+    private final Set<String> onBuilding = new ConcurrentSkipListSet<>();
+    private final ExecutorService threadPool;
+    private final AsyncMqttClient mqttClient;
+    private final AppDeployer appDeployer;
+    private final AppConfigQuery appConfigQuery;
+    private final AppBuildingQuery buildingQuery;
+    private final AppBuildingCrud buildingCrud;
+    private final AppDeployingQuery deployingQuery;
+    private final AppDeployingCrud deployingCrud;
+    private final BuildLogQuery buildLogQuery;
+    private final BuildLogCrud buildLogCrud;
+
+    public BuildDeployServiceImpl(AsyncMqttClient mqttClient, AppDeployer appDeployer, AppConfigQuery appConfigQuery,
+                                  AppBuildingQuery buildingQuery, AppBuildingCrud buildingCrud,
+                                  AppDeployingQuery deployingQuery, AppDeployingCrud deployingCrud,
+                                  BuildLogQuery buildLogQuery, BuildLogCrud buildLogCrud) {
         this.threadPool = ThreadPoolWrapper.threadPool("build-deploy");
+        this.mqttClient = mqttClient;
+        this.appDeployer = appDeployer;
+        this.appConfigQuery = appConfigQuery;
+        this.buildingQuery = buildingQuery;
+        this.buildingCrud = buildingCrud;
+        this.deployingQuery = deployingQuery;
+        this.deployingCrud = deployingCrud;
+        this.buildLogQuery = buildLogQuery;
+        this.buildLogCrud = buildLogCrud;
     }
 
     @Override
     public String buildAndDeploy(String appId) {
-        return "";
+        build(appId);
+
+        if (!onBuilding.add(appId)) {
+            return appId + " 正在构建中";
+        }
+
+        AppConfig app = appConfigQuery.findByAppId(appId);
+        if (app == null) {
+            onBuilding.remove(appId);
+            //throw new Exception(appId + " 不存在");
+        }
+        AppBuilder appIntegrate = new AppBuilder(app);
+
+        AppBuildSupplier supplier = new AppBuildSupplier(appIntegrate);
+        log.info("开始异步构建 {}", appId);
+        CompletableFuture.supplyAsync(supplier, threadPool)
+                .whenComplete((buildLog, throwable) -> {
+                    onBuilding.remove(appId);
+                    if (throwable != null) {
+                        String msg = throwable.getMessage();
+                        return;
+                    }
+
+                    buildLog = buildLogCrud.save(buildLog);
+                    AppBuilding appBuilding = buildingQuery.findByAppId(appId);
+                    if (appBuilding != null) {
+                        appBuilding.update(buildLog);
+                        buildingCrud.save(appBuilding);
+                    }
+
+                    List<AppDeploying> appDeployings = deployingQuery.findByAppId(appId);
+                    if (!appDeployings.isEmpty()) {
+                        for (AppDeploying appDeploying : appDeployings) {
+                            appDeploying.setBuildLogId(buildLog.getId());
+                        }
+                        deployingCrud.saveAll(appDeployings);
+                    }
+                    log.info("{} 构建完成", appId);
+                })
+                .thenAccept(buildLog -> {
+                    if (buildLog.getResult().getCode() == ResultStatus.SUCCESS.getCode()) {
+                        log.info("开始部署 {}", appId);
+                        try {
+                            appDeployer.deploy(buildLog);
+                        } catch (MqttException e) {
+                            e.printStackTrace();
+                        }
+                    }
+                });
+        return appId + " 已开始构建";
     }
 
     @Override
     public String build(String appId) {
-        return "";
+        if (!onBuilding.add(appId)) {
+            return appId + " 正在构建中";
+        }
+
+        AppConfig app = appConfigQuery.findByAppId(appId);
+        if (app == null) {
+            onBuilding.remove(appId);
+            //throw new Exception(appId + " 不存在");
+        }
+
+        AppBuilder appIntegrate = new AppBuilder(app);
+        AppBuildSupplier supplier = new AppBuildSupplier(appIntegrate);
+        log.info("开始异步构建 {}", appId);
+        CompletableFuture.supplyAsync(supplier, threadPool)
+                .whenComplete((buildLog, throwable) -> {
+                    onBuilding.remove(appId);
+                    if (throwable != null) {
+                        String msg = throwable.getMessage();
+                        return;
+                    }
+
+                    buildLog = buildLogCrud.save(buildLog);
+                    AppBuilding appBuilding = buildingQuery.findByAppId(appId);
+                    if (appBuilding != null) {
+                        appBuilding.update(buildLog);
+                        buildingCrud.save(appBuilding);
+                    }
+
+                    List<AppDeploying> appDeployings = deployingQuery.findByAppId(appId);
+                    if (!appDeployings.isEmpty()) {
+                        for (AppDeploying appDeploying : appDeployings) {
+                            appDeploying.setBuildLogId(buildLog.getId());
+                        }
+                        deployingCrud.saveAll(appDeployings);
+                    }
+                    log.info("{} 构建完成", appId);
+                })
+                .thenAccept(buildLog -> {
+                });
+        return appId + " 已开始构建";
     }
 
     @Override
-    public String build(String appId, String commitId) {
-        return "";
+    public boolean deploy(String buildLogId) {
+        return false;
     }
 
     @Override
-    public boolean deploy(String appId, String commitId) {
-        return false;
+    public void downloadPackage(String buildLogId) {
+        BuildLog buildLog = buildLogQuery.findById(buildLogId);
+        if (buildLog == null) {
+            String msg = "构建不存在";
+            return;
+        }
+
+        String packagePath = buildLog.getPackagePath();
+    }
+
+    @Override
+    public List<String> currentlyBuilding() {
+        return Collections.emptyList();
+    }
+
+    @Override
+    public List<String> currentlyDeploying() {
+        List<DeployParam> list = mqttClient.unreturnedRpc().values().stream()
+                .filter(rpcMsg -> {
+                    String clazz = rpcMsg.getClazz();
+                    String method = rpcMsg.getMethod();
+                    return clazz.equals(AppRpcClazz.class.getSimpleName()) && method.equals(AppRpcClazz.deploy.name());
+                })
+                .map(rpcMsg -> JsonConverter.jsonToObject(rpcMsg.getJsonParam(), DeployParam.class))
+                .collect(Collectors.toList());
+
+        return list.stream().map(DeployParam::getAppId).collect(Collectors.toList());
     }
 }

+ 44 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterConnActionListener.java

@@ -0,0 +1,44 @@
+package cn.reghao.autodop.dmaster.mqttsub;
+
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
+import cn.reghao.autodop.common.util.ExceptionUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttActionListener;
+import org.eclipse.paho.client.mqttv3.IMqttToken;
+import org.eclipse.paho.client.mqttv3.MqttException;
+
+/**
+ * MQTT 建立连接时的状态监听器
+ *
+ * @author reghao
+ * @date 2021-10-29 16:36:49
+ */
+@Slf4j
+public class DmasterConnActionListener implements IMqttActionListener {
+    private final AsyncMqttClient mqttClient;
+
+    public DmasterConnActionListener(AsyncMqttClient mqttClient) {
+        this.mqttClient = mqttClient;
+    }
+
+    @Override
+    public void onSuccess(IMqttToken asyncActionToken) {
+        log.info("MQTT 连接建立成功,开始订阅 topic");
+        mqttClient.getMsgListeners().forEach((topic, msgListener) -> {
+            try {
+                // 确保能订阅到离线消息的设置:
+                // - qos=1
+                // - clientId 确定且唯一,不能使用随机的 UUID
+                // - CleanSession=false
+                mqttClient.sub(topic, msgListener);
+            } catch (MqttException e) {
+                log.error("topic {} 订阅失败, 原因 {}", topic, ExceptionUtil.errorMsg(e));
+            }
+        });
+    }
+
+    @Override
+    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
+        log.error("MQTT 连接建立失败,原因 {}", exception.getMessage());
+    }
+}

+ 4 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DmasterTopicListener.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.mqttsub;
 
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.Message;
 import cn.reghao.autodop.common.msg.MsgType;
 import cn.reghao.autodop.common.msg.pub.PubMsg;
@@ -26,11 +26,11 @@ import org.springframework.stereotype.Component;
 public class DmasterTopicListener implements IMqttMessageListener {
     private final NodeEventPubClazzDispatcher nodeEventPubClazzDispatcher;
     private final AppRpcClazzResultDispatcher appRpcClazzResultDispatcher;
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
 
     public DmasterTopicListener(NodeEventPubClazzDispatcher nodeEventPubClazzDispatcher,
                                 AppRpcClazzResultDispatcher appRpcClazzResultDispatcher,
-                                DefaultMqttClient mqttClient) {
+                                AsyncMqttClient mqttClient) {
         this.nodeEventPubClazzDispatcher = nodeEventPubClazzDispatcher;
         this.appRpcClazzResultDispatcher = appRpcClazzResultDispatcher;
         this.mqttClient = mqttClient;
@@ -75,7 +75,7 @@ public class DmasterTopicListener implements IMqttMessageListener {
     }
 
     private void dispatchRpcResult(String msgId, RpcMsg rpcMsg) {
-        RpcMsg rpcMsg1 = mqttClient.getRecord(msgId);
+        RpcMsg rpcMsg1 = mqttClient.getAndRemoveRecord(msgId);
         if (rpcMsg1 == null) {
             log.error("RPC 调用结果 {} 找不到 RPC 调用参数", rpcMsg);
             return;
@@ -89,6 +89,5 @@ public class DmasterTopicListener implements IMqttMessageListener {
             default:
                 ;
         }
-        mqttClient.removeRecord(msgId);
     }
 }

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/BeansConfig.java

@@ -1,6 +1,6 @@
 package cn.reghao.autodop.dmaster.spring;
 
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.mqtt.MqttProperties;
 import cn.reghao.jdkutil.converter.ByteConverter;
 import cn.reghao.jdkutil.http.WebClient;
@@ -19,13 +19,13 @@ import java.io.IOException;
 @Configuration
 public class BeansConfig {
     @Bean
-    public DefaultMqttClient mqttClient(MosquittoProperties mosquittoProperties) throws MqttException, IOException {
+    public AsyncMqttClient mqttClient(MosquittoProperties mosquittoProperties) throws MqttException {
         MqttProperties mqttProperties = new MqttProperties();
         mqttProperties.setBroker(mosquittoProperties.getBroker());
         mqttProperties.setClientId(mosquittoProperties.getClientId());
         mqttProperties.setUsername(mosquittoProperties.getUsername());
         mqttProperties.setPassword(mosquittoProperties.getPassword());
-        return new DefaultMqttClient(mqttProperties);
+        return new AsyncMqttClient(mqttProperties);
     }
 
     @Bean

+ 16 - 24
dmaster/src/main/java/cn/reghao/autodop/dmaster/spring/DmasterLifecycle.java

@@ -4,12 +4,12 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
 import ch.qos.logback.core.Appender;
 import cn.reghao.autodop.common.log.LoggerConfig;
 import cn.reghao.autodop.common.machine.Machine;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
-import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.autodop.common.mqtt.AsyncMqttClient;
 import cn.reghao.autodop.common.msg.MsgQueue;
 import cn.reghao.autodop.dmaster.app.model.constant.EnvList;
 import cn.reghao.autodop.dmaster.app.model.po.config.build.BuildDir;
 import cn.reghao.autodop.dmaster.app.service.config.BuildDirService;
+import cn.reghao.autodop.dmaster.mqttsub.DmasterConnActionListener;
 import cn.reghao.autodop.dmaster.mqttsub.DmasterTopicListener;
 import cn.reghao.autodop.dmaster.machine.db.crud.NodeLogCrud;
 import cn.reghao.autodop.dmaster.mqttsub.SysTopicListener;
@@ -34,24 +34,19 @@ import java.util.List;
 @Slf4j
 @Component
 public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
-    private final DmasterTopicListener messageListener;
-    private final DefaultMqttClient mqttClient;
+    private final AsyncMqttClient mqttClient;
+    private final DmasterTopicListener dmasterTopicListener;
+    private final SysTopicListener sysTopicListener;
     private final BuildDirService buildDirService;
-    private SysEnvQuery sysEnvQuery;
-    private final SysEnvCrud sysEnvCrud;
-    private SysTopicListener sysTopicSubListener;
 
-    public DmasterLifecycle(NodeLogCrud nodeLogCrud, DmasterTopicListener messageListener, BuildDirService buildDirService,
-                            DefaultMqttClient mqttClient, SysEnvQuery sysEnvQuery, SysEnvCrud sysEnvCrud,
-                            SysTopicListener sysTopicSubListener) {
-        this.messageListener = messageListener;
-        this.buildDirService = buildDirService;
+    public DmasterLifecycle(AsyncMqttClient mqttClient, DmasterTopicListener dmasterTopicListener,
+                            SysTopicListener sysTopicListener, BuildDirService buildDirService,
+                            NodeLogCrud nodeLogCrud) {
         this.mqttClient = mqttClient;
-        this.sysEnvQuery = sysEnvQuery;
-        this.sysEnvCrud = sysEnvCrud;
-        this.sysTopicSubListener = sysTopicSubListener;
+        this.dmasterTopicListener = dmasterTopicListener;
+        this.sysTopicListener = sysTopicListener;
+        this.buildDirService = buildDirService;
         initLogger(nodeLogCrud);
-        initSysEnv();
     }
 
     private void initLogger(NodeLogCrud nodeLogCrud) {
@@ -73,12 +68,14 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     private void subTopic() throws MqttException {
-        mqttClient.sub(MsgQueue.dmasterTopic(), messageListener);
-        mqttClient.sub(MsgQueue.sysTopic(), sysTopicSubListener);
+        mqttClient.add(MsgQueue.dmasterTopic(), dmasterTopicListener);
+        mqttClient.add(MsgQueue.sysTopic(), sysTopicListener);
+        DmasterConnActionListener connActionListener = new DmasterConnActionListener(mqttClient);
+        mqttClient.connect(connActionListener);
     }
 
     private void initBuildDir() {
-        log.info("初始化构建目录");
+        log.info("初始化本地构建目录");
         BuildDir buildDir = buildDirService.get(Machine.ID);
         if (buildDir == null) {
             buildDirService.createAndSave();
@@ -90,11 +87,6 @@ public class DmasterLifecycle implements ApplicationRunner, DisposableBean {
     private void initSysEnv() {
         for (EnvList env : EnvList.values()) {
             String name = env.name();
-            SysEnv sysEnv = sysEnvQuery.findByEnv(name);
-            if (sysEnv == null) {
-                sysEnv = new SysEnv(name, "", false);
-                sysEnvCrud.save(sysEnv);
-            }
         }
     }
 }

+ 1 - 0
dmaster/src/main/resources/templates/app/bd/index.html

@@ -75,6 +75,7 @@
                     <td>
                         <a class="ajax-post" th:href="@{'/api/app/bd/update?appId='+${item.appId}}">更新</a>
                         <a class="ajax-post" th:href="@{'/api/app/bd/build?appId='+${item.appId}}">构建</a>
+                        <a class="ajax-get" th:href="@{'/api/app/bd/dl/'+${item.buildLogId}}">下载</a>
                         <a class="open-popup" data-title="应用部署状态" th:attr="data-url=@{'/app/bd/deploy/'+${item.appId}}"
                            data-size="1000,500" href="#">部署状态</a>
                     </td>

+ 12 - 0
pom.xml

@@ -50,5 +50,17 @@
             <version>1.18.0</version>
             <scope>provided</scope>
         </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-core</artifactId>
+            <version>1.2.3</version>
+        </dependency>
+
+        <dependency>
+            <groupId>ch.qos.logback</groupId>
+            <artifactId>logback-classic</artifactId>
+            <version>1.2.3</version>
+        </dependency>
     </dependencies>
 </project>