Browse Source

使用 mqtt 替换 amqp

reghao 4 years ago
parent
commit
5b4da008bd
20 changed files with 275 additions and 208 deletions
  1. 2 0
      common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java
  2. 47 10
      common/src/main/java/cn/reghao/autodop/common/mqtt/MqttPub.java
  3. 40 7
      common/src/main/java/cn/reghao/autodop/common/mqtt/MqttSub.java
  4. 0 71
      common/src/main/java/cn/reghao/autodop/common/mqtt/Mqttc.java
  5. 1 12
      common/src/main/java/cn/reghao/autodop/common/shell/ShellExecutor.java
  6. 39 0
      dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/DmasterMessageDispatcher.java
  7. 29 7
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java
  8. 1 1
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggerConfig.java
  9. 1 1
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggingInitializer.java
  10. 0 35
      dagent/src/test/java/cn/reghao/autodop/dagent/MqttTest.java
  11. 9 4
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/BuildDeployController.java
  12. 1 8
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/repository/log/BuildLogRepository.java
  13. 23 6
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java
  14. 0 38
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildService.java
  15. 20 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  16. 30 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/DagentMessageDispatcher.java
  17. 12 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/WebBody.java
  18. 17 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/lifecycle/AfterAppStart.java
  19. 1 1
      dmaster/src/main/resources/application-dev.yml
  20. 2 0
      dmaster/src/test/java/cn/reghao/autodop/dmaster/app/service/RefreshServiceTest.java

+ 2 - 0
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java

@@ -42,6 +42,8 @@ public class RabbitProducer {
     }
 
     /**
+     * 发送消息
+     *
      * @param
      * @return
      * @date 2021-02-23 下午3:28

+ 47 - 10
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttPub.java

@@ -1,21 +1,58 @@
 package cn.reghao.autodop.common.mqtt;
 
+import cn.reghao.autodop.common.utils.MachineId;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
 
 /**
  * @author reghao
- * @date 2021-05-21 07:15:27
+ * @date 2021-05-21 08:27:41
  */
 @Slf4j
-public class MqttPub {
-    private static String topic = "test";
-
-    public static void main(String[] args) throws MqttException {
-        try (Mqttc mqttc = new Mqttc()) {
-            String payload = "hello from dmaster...";
-            mqttc.pub(topic, payload);
-            log.info("消息已发送...");
-        }
+@Component
+public class MqttPub implements AutoCloseable {
+    private String broker = "tcp://localhost:1883";
+    private String username = "dev";
+    private String password = "Dev@123456";
+    private String clientId = UUID.randomUUID().toString();
+    private MqttClient client;
+
+    public MqttPub() throws MqttException {
+        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
+    }
+
+    public MqttPub(String clientId) throws MqttException {
+        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
+    }
+
+    private MqttConnectOptions connectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(true);
+        options.setUserName(username);
+        options.setPassword(password.toCharArray());
+        options.setConnectionTimeout(10);
+        options.setKeepAliveInterval(20);
+        return options;
+    }
+
+    @Override
+    public void close() throws MqttException {
+        client.close();
+    }
+
+    public void pub(String topic, String payload) throws MqttException {
+        client.connect(connectOptions());
+        MqttTopic mqttTopic = client.getTopic(topic);
+        MqttMessage message = new MqttMessage();
+        message.setQos(1);
+        message.setRetained(false);
+        message.setPayload(payload.getBytes());
+        MqttDeliveryToken token = mqttTopic.publish(message);
+        token.waitForCompletion();
+        client.disconnect();
     }
 }

+ 40 - 7
common/src/main/java/cn/reghao/autodop/common/mqtt/MqttSub.java

@@ -1,19 +1,52 @@
 package cn.reghao.autodop.common.mqtt;
 
+import cn.reghao.autodop.common.utils.MachineId;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+import org.springframework.stereotype.Component;
+
+import java.util.UUID;
 
 /**
  * @author reghao
- * @date 2021-05-21 07:15:27
+ * @date 2021-05-21 08:27:41
  */
 @Slf4j
-public class MqttSub {
-    private static String topic = "test";
+@Component
+public class MqttSub implements AutoCloseable {
+    private String broker = "tcp://localhost:1883";
+    private String username = "dev";
+    private String password = "Dev@123456";
+    private String clientId = UUID.randomUUID().toString();
+    private MqttClient client;
+
+    public MqttSub() throws MqttException {
+        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
+    }
+
+    public MqttSub(String clientId) throws MqttException {
+        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
+    }
+
+    private MqttConnectOptions connectOptions() {
+        MqttConnectOptions options = new MqttConnectOptions();
+        options.setCleanSession(true);
+        options.setUserName(username);
+        options.setPassword(password.toCharArray());
+        options.setConnectionTimeout(10);
+        options.setKeepAliveInterval(20);
+        return options;
+    }
+
+    @Override
+    public void close() throws MqttException {
+        client.close();
+    }
 
-    public static void main(String[] args) throws MqttException, InterruptedException {
-        Mqttc mqttc = new Mqttc();
-        mqttc.sub(topic);
-        log.info("已订阅来自 {} 的消息...", topic);
+    public void sub(String topic, MqttCallback callback) throws MqttException {
+        client.connect(connectOptions());
+        client.subscribe(topic);
+        client.setCallback(callback);
     }
 }

+ 0 - 71
common/src/main/java/cn/reghao/autodop/common/mqtt/Mqttc.java

@@ -1,71 +0,0 @@
-package cn.reghao.autodop.common.mqtt;
-
-import lombok.extern.slf4j.Slf4j;
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * @author reghao
- * @date 2021-05-21 08:27:41
- */
-@Slf4j
-public class Mqttc implements AutoCloseable {
-    private String broker = "tcp://localhost:1883";
-    private String username = "dev";
-    private String password = "Dev@123456";
-    private String clientId = "pub-client";
-    private MqttClient client;
-
-    public Mqttc() throws MqttException {
-        this.client = new MqttClient(broker, clientId, new MemoryPersistence());
-    }
-
-    private MqttConnectOptions connectOptions() {
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(true);
-        options.setUserName(username);
-        options.setPassword(password.toCharArray());
-        options.setConnectionTimeout(10);
-        options.setKeepAliveInterval(20);
-        return options;
-    }
-
-    @Override
-    public void close() throws MqttException {
-        client.close();
-    }
-
-    public void pub(String topic, String payload) throws MqttException {
-        client.connect(connectOptions());
-        MqttTopic mqttTopic = client.getTopic(topic);
-        MqttMessage message = new MqttMessage();
-        message.setQos(1);
-        message.setRetained(false);
-        message.setPayload(payload.getBytes());
-        MqttDeliveryToken token = mqttTopic.publish(message);
-        token.waitForCompletion();
-        client.disconnect();
-    }
-
-    public void sub(String topic) throws MqttException {
-        client.connect(connectOptions());
-        client.subscribe(topic);
-        client.setCallback(new MqttCallback() {
-            @Override
-            public void connectionLost(Throwable cause) {
-                log.error("conn lost {}", cause.getMessage());
-            }
-
-            @Override
-            public void messageArrived(String topic, MqttMessage message) {
-                log.info("thread {} - topic {}", Thread.currentThread().getName(), topic);
-                log.info("msg -> {}", message.toString());
-            }
-
-            @Override
-            public void deliveryComplete(IMqttDeliveryToken token) {
-                log.info("token -> {}", token);
-            }
-        });
-    }
-}

+ 1 - 12
common/src/main/java/cn/reghao/autodop/common/shell/ShellExecutor.java

@@ -45,8 +45,8 @@ public class ShellExecutor {
     private ShellResult exec(ProcessBuilder pb, File ofile) throws IOException, InterruptedException {
         Process newProcess = pb.start();
         ProcessHandle handle = newProcess.toHandle();
+        // 子进程 PID
         long pid = handle.pid();
-        System.out.println(pid);
 
         // 父进程等待子进程结束
         int exitCode = newProcess.waitFor();
@@ -66,15 +66,4 @@ public class ShellExecutor {
         in.close();
         return sb.toString();
     }
-
-    public static void main(String[] args) throws Exception {
-        String execBin = "/home/reghao/dev/env/dotnet/dotnet-sdk-5.0.100-linux-x64/dotnet";
-        String cmd = execBin + " publish";
-        String dir = "/home/reghao/autodop/compile-dir/content/IQuizoo.ContentService";
-
-        //cmd = "sleep 10";
-        ShellExecutor shellExecutor = new ShellExecutor();
-        ShellResult shellResult = shellExecutor.exec(cmd, dir);
-        System.out.println();
-    }
 }

+ 39 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/DmasterMessageDispatcher.java

@@ -0,0 +1,39 @@
+package cn.reghao.autodop.dagent.dispatcher;
+
+import cn.reghao.autodop.common.mqtt.MqttPub;
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * 分发 dmaster 发送的消息
+ *
+ * @author reghao
+ * @date 2021-05-24 09:24:03
+ */
+@Slf4j
+public class DmasterMessageDispatcher implements MqttCallback {
+    private MqttPub mqttPub;
+
+    public DmasterMessageDispatcher(MqttPub mqttPub) {
+        this.mqttPub = mqttPub;
+    }
+
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("conn lost {}", cause.getMessage());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws MqttException {
+        log.info("{} -> {}", topic, message.toString());
+        mqttPub.pub("dmaster", "应用部署完成");
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        log.info("token -> {}", token);
+    }
+}

+ 29 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -5,11 +5,15 @@ import cn.reghao.autodop.common.amqp.MqMessage;
 import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.dagent.machine.api.MachineOps;
+import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.mqtt.MqttSub;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
+import cn.reghao.autodop.dagent.dispatcher.DmasterMessageDispatcher;
 import cn.reghao.autodop.dagent.machine.timer.HeartbeatJob;
 import cn.reghao.autodop.dagent.machine.timer.MachineScheduler;
 import cn.reghao.autodop.dagent.utils.log.LoggerConfig;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.quartz.SchedulerException;
 import org.springframework.beans.factory.DisposableBean;
 import org.springframework.boot.ApplicationArguments;
@@ -26,15 +30,22 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class DagentLifecycle implements ApplicationRunner, DisposableBean {
+    private MqttPub mqttPub;
+    private MqttSub mqttSub;
     private RabbitProperties rabbitProperties;
     private MachineScheduler machineScheduler;
     private RabbitProducer rabbitProducer;
     private Machine machine;
+    private String topic = "dagent@" + Machine.machineId();
 
-    public DagentLifecycle(RabbitProperties rabbitProperties,
-                         MachineScheduler machineScheduler,
-                         RabbitProducer rabbitProducer,
-                         Machine machine) {
+    public DagentLifecycle(MqttPub mqttPub,
+                           MqttSub mqttSub,
+                           RabbitProperties rabbitProperties,
+                           MachineScheduler machineScheduler,
+                           RabbitProducer rabbitProducer,
+                           Machine machine) {
+        this.mqttPub = mqttPub;
+        this.mqttSub = mqttSub;
         this.rabbitProperties = rabbitProperties;
         this.machineScheduler = machineScheduler;
         this.rabbitProducer = rabbitProducer;
@@ -48,7 +59,8 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
     }
 
     @Override
-    public void destroy() {
+    public void destroy() throws MqttException {
+        mqttPub.pub("dmaster", "Dagent 停止...");
         log.info("Dagent 停止...");
     }
 
@@ -56,8 +68,9 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
      * @date 2019-09-26 下午5:23
      */
     private void activate() throws Exception {
-        registry();
-        scheduledJobs();
+        subAndPub();
+        //registry();
+        //scheduledJobs();
         initLogger();
         log.info("dagent 应用已启动...");
     }
@@ -70,6 +83,15 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
         log.info("发送机器注册信息...");
     }
 
+    private void subAndPub() throws MqttException {
+        mqttSub.sub(topic, new DmasterMessageDispatcher(mqttPub));
+        String payload = JsonConverter.objectToJson(machine.registry());
+        MqMessage mqMessage = MqMessage.mqMessage(MessageType.machineType.name(),
+                MachineOps.machineRegistryOps.name(), false, payload);
+        mqttPub.pub("dmaster", JsonConverter.objectToJson(mqMessage));
+        log.info("发送机器注册信息...");
+    }
+
     private void scheduledJobs() throws SchedulerException {
         machineScheduler.add(HeartbeatJob.class, "machine-heartbeat", "0/10 * * * * ?");
         machineScheduler.start();

+ 1 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggerConfig.java

@@ -25,7 +25,7 @@ public class LoggerConfig {
         Logger rootLogger = loggerContext.getLogger("ROOT");
         rootLogger.setAdditive(false);
         rootLogger.setLevel(Level.INFO);
-        rootLogger.addAppender(amqpAppender(rabbitProperties));
+        //rootLogger.addAppender(amqpAppender(rabbitProperties));
     }
 
     private Appender<ILoggingEvent> amqpAppender(RabbitProperties rabbitProperties) {

+ 1 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggingInitializer.java

@@ -28,7 +28,7 @@ public class LoggingInitializer implements ApplicationContextInitializer<Configu
         Logger rootLogger = loggerContext.getLogger("ROOT");
         rootLogger.setAdditive(false);
         rootLogger.setLevel(Level.INFO);
-        rootLogger.addAppender(amqpAppender(loggerContext));
+        //rootLogger.addAppender(amqpAppender(loggerContext));
     }
 
     private Appender<ILoggingEvent> amqpAppender(LoggerContext loggerContext) {

+ 0 - 35
dagent/src/test/java/cn/reghao/autodop/dagent/MqttTest.java

@@ -1,35 +0,0 @@
-package cn.reghao.autodop.dagent;
-
-import org.eclipse.paho.client.mqttv3.*;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
-
-/**
- * @author reghao
- * @date 2021-04-14 12:59:22
- */
-public class MqttTest {
-    static String broker = "tcp://localhost:1883";
-    static String clientId = "";
-    static String topic = "test";
-    static String username = "";
-    static String password = "";
-
-    static void publish() {
-        MqttMessage  mqttMessage = new MqttMessage();
-    }
-
-    static void subscribe() {
-    }
-
-    public static void main(String[] args) throws MqttException {
-        MqttClient mqttClient = new MqttClient(broker, clientId, new MemoryPersistence());
-        MqttConnectOptions options = new MqttConnectOptions();
-        options.setCleanSession(true);
-        options.setUserName(username);
-        options.setPassword(password.toCharArray());
-        options.setConnectionTimeout(10);
-        options.setKeepAliveInterval(20);
-
-        MqttTopic mqttTopic = mqttClient.getTopic(topic);
-    }
-}

+ 9 - 4
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/controller/BuildDeployController.java

@@ -28,16 +28,16 @@ public class BuildDeployController {
     @ApiImplicitParams(@ApiImplicitParam(name="appId", value="应用 ID", paramType="query", dataType = "String"))
     @PostMapping("/update")
     public String buildAndDeploy(@RequestParam("appId") String appId) throws Exception {
-        buildDeployDispatcher.buildAndDeploy(appId, true);
-        return WebBody.success();
+        String ret = buildDeployDispatcher.buildAndDeploy(appId, true);
+        return WebBody.successMsg(ret);
     }
 
     @ApiOperation(value = "构建应用")
     @ApiImplicitParams(@ApiImplicitParam(name="appId", value="应用 ID", paramType="query", dataType = "String"))
     @PostMapping("/build")
     public String build(@RequestParam("appId") String appId) throws Exception {
-        buildDeployDispatcher.buildAndDeploy(appId, false);
-        return WebBody.success();
+        String ret = buildDeployDispatcher.buildAndDeploy(appId, false);
+        return WebBody.successMsg(ret);
     }
 
     @ApiOperation(value = "部署应用")
@@ -50,4 +50,9 @@ public class BuildDeployController {
         buildDeployDispatcher.deploy(appId, commitId);
         return WebBody.success();
     }
+
+    @GetMapping("/list")
+    public String buildList() {
+        return WebBody.success("build list");
+    }
 }

+ 1 - 8
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/repository/log/BuildLogRepository.java

@@ -9,12 +9,5 @@ import org.springframework.data.mongodb.repository.MongoRepository;
  * @author reghao
  * @date 2020-01-21 14:53:03
  */
-public interface BuildLogRepository extends MongoRepository<BuildLog, Long> {
-    // 唯一值
-    BuildLog findByAppIdAndRepoAndCompilerAndPackerAndPackScriptAndCommitIdAndStatusCode(
-            String appId, String repo, String compiler, String packer, String packScript,
-            String commitId, int statusCode);
-    Page<BuildLog> findByEnv(String env, Pageable pageable);
-    Page<BuildLog> findByEnvAndAppId(String env, String appId, Pageable pageable);
-    Page<BuildLog> findByEnvAndAppIdAndStatusCode(String env, String appId, int statusCode, Pageable pageable);
+public interface BuildLogRepository extends MongoRepository<BuildLog, String> {
 }

+ 23 - 6
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java

@@ -1,5 +1,6 @@
 package cn.reghao.autodop.dmaster.app.service;
 
+import cn.reghao.autodop.common.result.ResultCode;
 import cn.reghao.autodop.dmaster.app.cache.BuildDeployCache;
 import cn.reghao.autodop.dmaster.app.entity.config.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.entity.log.*;
@@ -11,6 +12,7 @@ import cn.reghao.autodop.dmaster.app.entity.config.AppOrchestration;
 import cn.reghao.autodop.dmaster.utils.notifier.NotifyService;
 import cn.reghao.autodop.dmaster.common.thread.ThreadPoolWrapper;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Service;
 
 import java.util.*;
@@ -54,9 +56,9 @@ public class BuildDeployDispatcher {
      * @return
      * @date 2021-02-06 上午1:46
      */
-    public void buildAndDeploy(String appId, boolean isDeploy) throws Exception {
+    public String buildAndDeploy(String appId, boolean isDeploy) throws Exception {
         if (!onBuilding.add(appId)) {
-            //
+            return appId + " 正在构建中";
         }
 
         AppIntegrate appIntegrate = integrateMap.get(appId);
@@ -76,15 +78,30 @@ public class BuildDeployDispatcher {
         }
 
         BuildSupplier supplier = new BuildSupplier(appIntegrate);
+        log.info("开始异步构建 {}...", appId);
         CompletableFuture.supplyAsync(supplier, threadPool)
-                .thenApply(buildLog -> {
+                .whenComplete((buildLog, throwable) -> {
+                    if (buildLog.getBuildResult().getCode() == ResultCode.FAIL.getCode()) {
+                        log.info("{} 构建失败...", appId);
+                        // 发送通知...
+                    }
+
                     buildLogRepository.save(buildLog);
                     onBuilding.remove(appId);
+                    log.info("{} 构建完成...", appId);
+                })
+                .thenAccept(buildLog -> {
                     if (isDeploy) {
+                        log.info("开始部署 {}...", appId);
+                        try {
+                            appDeployer.deploy(buildLog);
+                        } catch (MqttException e) {
+                            e.printStackTrace();
+                        }
                     }
-                    return true;
-                })
-                .complete(true);
+                });
+
+        return appId + " 已开始构建";
     }
 
     /**

+ 0 - 38
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildService.java

@@ -49,40 +49,6 @@ public class BuildService {
         this.buildDeployAppRepository = buildDeployAppRepository;
     }
 
-    public PageList<BuildDeployApp> buildList(int page, int size, String env) {
-        PageList<BuildDeployApp> pageList = buildDeployAppCrudService.getByPage(page, size, env);
-        PageList<BuildDeployApp> vos = new PageList<>();
-        vos.setTotalPages(pageList.getTotalPages());
-        vos.setTotalSize(pageList.getTotalSize());
-        vos.setPageSize(vos.getPageSize());
-        vos.setHasNext(pageList.isHasNext());
-        vos.setList(pageList.getList().stream().map(BuildDeployApp::vo).collect(Collectors.toList()));
-        return vos;
-    }
-
-    public void refreshBuildList(String env) {
-
-    }
-
-    public PageList<SuccessfullyBuildVO> successfullyBuilds(String appId, PageRequest pageRequest) {
-        PageList<SuccessfullyBuildVO> pageList = new PageList<>();
-        Page<BuildLog> buildLogs =
-                buildLogRepository.findByEnvAndAppIdAndStatusCode("test", appId, 0, pageRequest);
-        pageList.setTotalPages(buildLogs.getTotalPages());
-        pageList.setTotalSize(buildLogs.getTotalElements());
-        pageList.setHasNext(buildLogs.hasNext());
-        /*pageList.setList(buildLogs.stream()
-                .map(buildLog -> {
-                    String commitId = "buildLog.getCommitLog().getCommitId()";
-                    String commitMsg = "buildLog.getCommitLog().getCommitMsg()";
-                    String appPath = "buildLog.getAppPath()";
-                    String buildTime = DatetimeConverter.format(buildLog.getBuildTime());
-                    return new SuccessfullyBuildVO(commitId, commitMsg, appPath, buildTime);
-                })
-                .collect(Collectors.toList()));*/
-        return pageList;
-    }
-
     public PageList<CommitLog> commitLogs(int page, int size, String env) {
         PageRequest pageRequest =
                 PageRequest.of(page-1, size, Sort.by(Sort.Direction.DESC, "createTime"));
@@ -160,8 +126,4 @@ public class BuildService {
         pageList.setList(deployLogs.stream().map(DeployLog::vo).collect(Collectors.toList()));
         return pageList;
     }
-
-    public PageList<CurrentRunningCommit> deployedApp(String appId, int page, int size) {
-        return new PageList<>();
-    }
 }

+ 20 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -6,8 +6,11 @@ import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.app.api.AppOps;
 import cn.reghao.autodop.common.dagent.app.api.data.DeployedAppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
+import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.docker.pojo.Config;
+import cn.reghao.autodop.common.mqtt.MqttPub;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
+import cn.reghao.autodop.dmaster.app.cache.BuildDeployCache;
 import cn.reghao.autodop.dmaster.app.constant.BuildDeployResult;
 import cn.reghao.autodop.dmaster.app.entity.config.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.entity.log.BuildLog;
@@ -18,6 +21,7 @@ import cn.reghao.autodop.dmaster.machine.entity.MachineInfo;
 import cn.reghao.autodop.dmaster.machine.entity.NetworkInfo;
 import cn.reghao.autodop.dmaster.machine.repository.MachineInfoRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Service;
 
 import java.time.LocalDateTime;
@@ -32,18 +36,33 @@ import java.util.*;
 @Slf4j
 @Service
 public class AppDeployer {
+    private MqttPub mqttPub;
+    private BuildDeployCache cache;
     private RemoteCallService callService;
     private BuildDeployLogConsumer logConsumer;
     private MachineInfoRepository machineInfoRepository;
 
-    public AppDeployer(RemoteCallService callService,
+    public AppDeployer(MqttPub mqttPub,
+                       BuildDeployCache cache,
+                       RemoteCallService callService,
                        BuildDeployLogConsumer logConsumer,
                        MachineInfoRepository machineInfoRepository) {
+        this.mqttPub = mqttPub;
+        this.cache = cache;
         this.callService = callService;
         this.logConsumer = logConsumer;
         this.machineInfoRepository = machineInfoRepository;
     }
 
+    public void deploy(BuildLog buildLog) throws MqttException {
+        String appId = buildLog.getAppId();
+        List<DeployConfig> deployConfigs = cache.findByAppId(buildLog.getAppId()).getDeployConfigs();
+        log.info("{} 正在部署...", appId);
+
+        String topic = "dagent@" + Machine.machineId();
+        mqttPub.pub(topic, "部署应用");
+    }
+
     /**
      * 部署应用
      *

+ 30 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/DagentMessageDispatcher.java

@@ -0,0 +1,30 @@
+package cn.reghao.autodop.dmaster.utils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttCallback;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+
+/**
+ * 分发 dagent 发送的消息
+ *
+ * @author reghao
+ * @date 2021-05-24 09:24:03
+ */
+@Slf4j
+public class DagentMessageDispatcher implements MqttCallback {
+    @Override
+    public void connectionLost(Throwable cause) {
+        log.error("conn lost -> {}", cause.getMessage());
+    }
+
+    @Override
+    public void messageArrived(String topic, MqttMessage message) {
+        log.info("{} -> {}", topic, message.toString());
+    }
+
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        log.info("token -> {}", token);
+    }
+}

+ 12 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/WebBody.java

@@ -45,7 +45,18 @@ public class WebBody {
     }
 
     public static String success() {
-        WebBody webBody = new WebBody(SUCCESS.getCode(), "ok");
+        WebBody webBody = new WebBody(SUCCESS.getCode(), SUCCESS.getMsg());
+        webBody.setTimestamp(DateTimeConverter.now());
+        try {
+            return objectMapper.writeValueAsString(webBody);
+        } catch (JsonProcessingException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    public static String successMsg(String msg) {
+        WebBody webBody = new WebBody(SUCCESS.getCode(), msg);
         webBody.setTimestamp(DateTimeConverter.now());
         try {
             return objectMapper.writeValueAsString(webBody);

+ 17 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/lifecycle/AfterAppStart.java

@@ -2,15 +2,18 @@ package cn.reghao.autodop.dmaster.utils.lifecycle;
 
 import cn.reghao.autodop.common.dagent.machine.hardware.disk.Disk;
 import cn.reghao.autodop.common.dagent.machine.hardware.disk.DiskInfo;
+import cn.reghao.autodop.common.mqtt.MqttSub;
 import cn.reghao.autodop.common.utils.FileOps;
-import cn.reghao.autodop.common.utils.text.TextFile;
+import cn.reghao.autodop.common.utils.MachineId;
 import cn.reghao.autodop.dmaster.app.entity.config.build.BuildDir;
 import cn.reghao.autodop.dmaster.app.repository.config.build.BuildDirRepository;
 import cn.reghao.autodop.dmaster.auth.service.UserService;
 import cn.reghao.autodop.dmaster.common.config.SysConfig;
 import cn.reghao.autodop.dmaster.monitor.HealthCheckJob;
 import cn.reghao.autodop.dmaster.monitor.MonitorScheduler;
+import cn.reghao.autodop.dmaster.utils.DagentMessageDispatcher;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.quartz.SchedulerException;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
@@ -29,21 +32,24 @@ import java.io.File;
 @Slf4j
 @Component
 public class AfterAppStart implements ApplicationRunner {
+    private MqttSub mqttSub;
     private MonitorScheduler monitorScheduler;
     private BuildDirRepository buildDirRepository;
     private UserService userService;
 
-    public AfterAppStart(MonitorScheduler monitorScheduler,
+    public AfterAppStart(MqttSub mqttSub,
+                         MonitorScheduler monitorScheduler,
                          BuildDirRepository buildDirRepository,
                          UserService userService) {
+        this.mqttSub = mqttSub;
         this.monitorScheduler = monitorScheduler;
         this.buildDirRepository = buildDirRepository;
         this.userService = userService;
     }
 
     @Override
-    public void run(ApplicationArguments args) {
-        String machineId = new TextFile().read("/etc/machine-id").get(0);
+    public void run(ApplicationArguments args) throws MqttException {
+        String machineId = MachineId.id();
         BuildDir buildDir = buildDirRepository.findByMachineId(machineId);
         if (buildDir == null) {
             String home = System.getProperty("user.home");
@@ -71,13 +77,14 @@ public class AfterAppStart implements ApplicationRunner {
         }
     }
 
-    public void initialize(BuildDir buildDir) {
+    public void initialize(BuildDir buildDir) throws MqttException {
         // TODO 放入缓存
         SysConfig.localRepo = buildDir.getLocalDir() + "/local-repo";
         SysConfig.compileDir = buildDir.getLocalDir() + "/compile-dir";
         SysConfig.packDir = buildDir.getLocalDir() + "/pack-dir";
         checkAndSetLocalDir();
         //userService.checkOrSetAdmin();
+        sub();
     }
 
     /**
@@ -109,4 +116,9 @@ public class AfterAppStart implements ApplicationRunner {
         monitorScheduler.add(HealthCheckJob.class, "machine-heartbeat", "0/10 * * * * ?");
         monitorScheduler.start();
     }
+
+    private void sub() throws MqttException {
+        String topic = "dmaster";
+        mqttSub.sub(topic, new DagentMessageDispatcher());
+    }
 }

+ 1 - 1
dmaster/src/main/resources/application-dev.yml

@@ -5,7 +5,7 @@ spring:
     password: Dev@123456
   data:
     mongodb:
-      uri: mongodb://localhost/reghao_devops_rdb1
+      uri: mongodb://localhost/reghao_devops_rdb
   rabbitmq:
     host: localhost
     port: 5672

+ 2 - 0
dmaster/src/test/java/cn/reghao/autodop/dmaster/app/service/RefreshServiceTest.java

@@ -17,6 +17,8 @@ import org.springframework.test.context.ActiveProfiles;
 import org.springframework.test.context.junit4.SpringRunner;
 
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 @Slf4j