Browse Source

处理通知消息格式和对 MQTT pub 的状态记录

1.处理 webhook, sms 和 email 等不同通知类型的通知消息格式
2.针对需要返回结果的 MQTT pub, 提供了一个记录器来保存 pub-sub 的状态
reghao 4 years ago
parent
commit
fd6eb4e387
27 changed files with 309 additions and 60 deletions
  1. 8 0
      common/src/main/java/cn/reghao/autodop/common/message/AsyncMsg.java
  2. 1 1
      common/src/main/java/cn/reghao/autodop/common/message/ops/OpsProcessor.java
  3. 12 0
      common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java
  4. 1 0
      common/src/main/java/cn/reghao/autodop/common/utils/serializer/JsonConverter.java
  5. 8 7
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java
  6. 2 1
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java
  7. 1 1
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java
  8. 2 1
      dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java
  9. 2 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java
  10. 23 4
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java
  11. 1 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  12. 22 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/entity/MonitorJob.java
  13. 6 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/MonitorScheduler.java
  14. 7 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/job/AppHealthCheckJob.java
  15. 1 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/job/MachineStatusMonitorJob.java
  16. 8 7
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java
  17. 26 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/AppOpsProcessor.java
  18. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/DagentOpsProcessor.java
  19. 9 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/MachineOpsProcessor.java
  20. 23 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/entity/NotifyGroup.java
  21. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/repository/EmailAccountRepository.java
  22. 1 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/repository/SmsAccountRepository.java
  23. 109 23
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/NotifyService.java
  24. 18 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/NotifyMsg.java
  25. 12 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/sms/SmsMsg.java
  26. 3 3
      dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/sms/SmsNotify.java
  27. 1 1
      dmaster/src/main/resources/templates/monitor/machine.html

+ 8 - 0
common/src/main/java/cn/reghao/autodop/common/message/AsyncMsg.java

@@ -2,6 +2,8 @@ package cn.reghao.autodop.common.message;
 
 import lombok.Data;
 
+import java.util.UUID;
+
 /**
  * 异步消息
  * 执行两次序列化,对 payload 执行一次序列化,然后对 AsyncMsg 执行一次序列化
@@ -11,6 +13,7 @@ import lombok.Data;
  */
 @Data
 public class AsyncMsg {
+    private String msgId;
     private String machineId;
     private long sendTime;
     // MessageType
@@ -19,8 +22,13 @@ public class AsyncMsg {
     private String ops;
     private String payload;
 
+    public AsyncMsg() {
+        this.msgId = UUID.randomUUID().toString();
+    }
+
     public static AsyncMsg asyncMsg(String machineId, String type, String ops, String payload) {
         AsyncMsg asyncMsg = new AsyncMsg();
+        asyncMsg.setMsgId(UUID.randomUUID().toString());
         asyncMsg.setMachineId(machineId);
         asyncMsg.setSendTime(System.currentTimeMillis());
         asyncMsg.setType(type);

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/message/ops/OpsProcessor.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.message.ops;
  * @date 2021-06-09 13:43:16
  */
 public interface OpsProcessor {
-    void process(String ops, String payload) throws Exception;
+    void process(String msgId, String ops, String payload) throws Exception;
 }

+ 12 - 0
common/src/main/java/cn/reghao/autodop/common/mqtt/DefaultMqttClient.java

@@ -1,5 +1,6 @@
 package cn.reghao.autodop.common.mqtt;
 
+import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.utils.MachineIdentity;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.*;
@@ -9,6 +10,7 @@ import org.springframework.stereotype.Component;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.UUID;
 
 /**
  * @author reghao
@@ -19,6 +21,8 @@ import java.util.Map;
 public class DefaultMqttClient implements AutoCloseable {
     private MosquittoProperties properties;
     private MqttClient client;
+    // 记录需要返回结果的 pub 消息, size 为 0 时表示所有 pub 消息的结果都已 sub
+    private Map<String, AsyncMsg> pubSubRecorder = new HashMap<>();
     private Map<String, IMqttMessageListener> subMap = new HashMap<>();
     private MqttCallback callback = new MqttClientCallback();
 
@@ -46,6 +50,14 @@ public class DefaultMqttClient implements AutoCloseable {
         client.close();
     }
 
+    public void putRecord(String msgId, AsyncMsg asyncMsg) {
+        this.pubSubRecorder.put(msgId, asyncMsg);
+    }
+
+    public void removeRecord(String msgId) {
+        this.pubSubRecorder.remove(msgId);
+    }
+
     public boolean isConnected() {
         return client.isConnected();
     }

+ 1 - 0
common/src/main/java/cn/reghao/autodop/common/utils/serializer/JsonConverter.java

@@ -34,6 +34,7 @@ public class JsonConverter {
 
     /**
      * JSON 转换为对象
+     * TODO <T> 的含义
      *
      * @param
      * @return

+ 8 - 7
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/DmasterMsgDispatcher.java

@@ -36,22 +36,23 @@ public class DmasterMsgDispatcher implements IMqttMessageListener {
     public void messageArrived(String topic, MqttMessage message) {
         try {
             String msg = message.toString();
-            AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String type = mqAsyncMsg.getType();
-            String ops = mqAsyncMsg.getOps();
-            String payload = mqAsyncMsg.getPayload();
+            AsyncMsg asyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
+            String msgId = asyncMsg.getMsgId();
+            String type = asyncMsg.getType();
+            String ops = asyncMsg.getOps();
+            String payload = asyncMsg.getPayload();
 
-            if (!mqAsyncMsg.getMachineId().equals(DagentLifecycle.MACHINE_ID) || mqAsyncMsg.getSendTime() < startTime) {
+            if (!asyncMsg.getMachineId().equals(DagentLifecycle.MACHINE_ID) || asyncMsg.getSendTime() < startTime) {
                 log.info("忽略不是发送到本节点或节点启动前发送的消息...");
                 return;
             }
 
             switch (MessageType.valueOf(type)) {
                 case machineType:
-                    machineOpsProcessor.process(ops, payload);
+                    machineOpsProcessor.process(msgId, ops, payload);
                     break;
                 case appType:
-                    appOpsProcessor.process(ops, payload);
+                    appOpsProcessor.process(msgId, ops, payload);
                     break;
                 case dagentType:
                     break;

+ 2 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/app/AppOpsProcessor.java

@@ -29,7 +29,7 @@ public class AppOpsProcessor implements OpsProcessor {
     }
 
     @Override
-    public void process(String ops, String payload) throws MqttException {
+    public void process(String msgId, String ops, String payload) throws MqttException {
         AsyncMsg asyncMsg;
         switch (AppOps.valueOf(ops)) {
             case appDeploy:
@@ -65,6 +65,7 @@ public class AppOpsProcessor implements OpsProcessor {
                 break;
         }
 
+        asyncMsg.setMsgId(msgId);
         mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
     }
 }

+ 1 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/docker/DockerOpsProcessor.java

@@ -12,6 +12,6 @@ import org.springframework.stereotype.Component;
 @Component
 public class DockerOpsProcessor implements OpsProcessor {
     @Override
-    public void process(String ops, String payload) {
+    public void process(String msgId, String ops, String payload) {
     }
 }

+ 2 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/mqttsub/processor/machine/MachineOpsProcessor.java

@@ -27,7 +27,7 @@ public class MachineOpsProcessor implements OpsProcessor {
     }
 
     @Override
-    public void process(String ops, String payload) throws MqttException {
+    public void process(String msgId, String ops, String payload) throws MqttException {
         AsyncMsg asyncMsg;
         switch (MachineOps.valueOf(ops)) {
             case machineStat:
@@ -48,6 +48,7 @@ public class MachineOpsProcessor implements OpsProcessor {
                 break;
         }
 
+        asyncMsg.setMsgId(msgId);
         mqttClient.pub("dmaster", 1, JsonConverter.objectToJson(asyncMsg));
     }
 }

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -65,6 +65,7 @@ public class AppStatusService {
         String topic = "dagent/" + machineId;
         try {
             mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.putRecord(asyncMsg.getMsgId(), asyncMsg);
         } catch (MqttException e) {
             e.printStackTrace();
         }
@@ -103,6 +104,7 @@ public class AppStatusService {
         String topic = "dagent/" + machineId;
         try {
             mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.putRecord(asyncMsg.getMsgId(), asyncMsg);
         } catch (MqttException e) {
             e.printStackTrace();
         }

+ 23 - 4
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/BuildDeployDispatcher.java

@@ -12,8 +12,13 @@ import cn.reghao.autodop.dmaster.app.repository.log.BuildLogRepository;
 import cn.reghao.autodop.dmaster.app.service.bd.AppIntegrate;
 import cn.reghao.autodop.dmaster.app.service.bd.AppDeployer;
 import cn.reghao.autodop.dmaster.app.entity.config.AppOrchestration;
+import cn.reghao.autodop.dmaster.app.service.bd.BuildNotifyMsg;
+import cn.reghao.autodop.dmaster.notification.entity.NotifyGroup;
+import cn.reghao.autodop.dmaster.notification.entity.NotifyType;
 import cn.reghao.autodop.dmaster.notification.service.NotifyService;
 import cn.reghao.autodop.dmaster.common.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
+import cn.reghao.autodop.dmaster.notification.service.notifier.email.EmailMsg;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Service;
@@ -91,7 +96,7 @@ public class BuildDeployDispatcher {
 
         BuildSupplier supplier = new BuildSupplier(appIntegrate);
         AppOrchestration app = appIntegrate.app();
-        //final List<NotifyReceiver> notifyReceivers = app.getNotifyReceivers();
+        List<NotifyGroup> notifyGroups = app.getNotifyGroups();
         log.info("开始异步构建 {}...", appId);
         CompletableFuture.supplyAsync(supplier, threadPool)
                 .whenComplete((buildLog, throwable) -> {
@@ -120,8 +125,7 @@ public class BuildDeployDispatcher {
                 .thenAccept(buildLog -> {
                     if (!isDeploy) {
                         // TODO 抛出异常时不会有任何错误
-                        /*DingMsg dingMsg = new BuildNotifyMsg(buildLog).dingMsg();
-                        notifyReceivers.forEach(notifyReceiver -> notifyService.notify(notifyReceiver, dingMsg));*/
+                        notifyGroups.forEach(notifyGroup -> buildNotify(notifyGroup, buildLog));
                     }
 
                     if (buildLog.getResult().getCode() == ResultStatus.SUCCESS.getCode() && isDeploy) {
@@ -133,10 +137,25 @@ public class BuildDeployDispatcher {
                         }
                     }
                 });
-
         return appId + " 已开始构建";
     }
 
+    private void buildNotify(NotifyGroup notifyGroup, BuildLog buildLog) {
+        String notifyType = notifyGroup.getNotifyType();
+        switch (NotifyType.valueOf(notifyType)) {
+            case ding:
+                DingMsg dingMsg = new BuildNotifyMsg(buildLog).dingMsg();
+                notifyService.notify(notifyGroup, dingMsg);
+                break;
+            case email:
+                break;
+            case sms:
+                break;
+            default:
+                log.error("通知类型不存在...");
+        }
+    }
+
     /**
      * 部署应用的最新版本
      *

+ 1 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -56,6 +56,7 @@ public class AppDeployer {
             // TODO 对于需要返回值的 pub,需要做一个记录,pub 和 sub 一一对应
             String topic = "dagent/" + machineId;
             mqttClient.pub(topic, 1, JsonConverter.objectToJson(asyncMsg));
+            mqttClient.putRecord(asyncMsg.getMsgId(), asyncMsg);
         }
     }
 

+ 22 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/entity/MonitorJob.java

@@ -0,0 +1,22 @@
+package cn.reghao.autodop.dmaster.monitor.entity;
+
+import cn.reghao.autodop.dmaster.common.orm.BaseEntity;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import javax.persistence.MappedSuperclass;
+import javax.validation.constraints.NotBlank;
+
+/**
+ * @author reghao
+ * @date 2021-06-23 20:50:41
+ */
+@MappedSuperclass
+@EqualsAndHashCode(callSuper = false)
+@Data
+public class MonitorJob extends BaseEntity<Integer> {
+    @NotBlank(message = "任务 ID 不能为空白字符串")
+    private String jobId;
+    @NotBlank(message = "CRON 表达式不能为空白字符串")
+    private String cron;
+}

+ 6 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/MonitorScheduler.java

@@ -6,6 +6,7 @@ import cn.reghao.autodop.dmaster.machine.db.crud.MachineStatusCrudService;
 import cn.reghao.autodop.dmaster.app.repository.AppRunningRepository;
 import cn.reghao.autodop.dmaster.monitor.service.job.AppHealthCheckJob;
 import cn.reghao.autodop.dmaster.monitor.service.job.MachineStatusMonitorJob;
+import cn.reghao.autodop.dmaster.notification.service.NotifyService;
 import lombok.extern.slf4j.Slf4j;
 import org.quartz.*;
 import org.quartz.impl.StdSchedulerFactory;
@@ -23,13 +24,15 @@ import javax.annotation.PostConstruct;
 @Service
 public class MonitorScheduler {
     private Scheduler scheduler;
+    private NotifyService notifyService;
     private AppRunningRepository runningRepository;
     private MachineStatusCrudService statusCrudService;
     private WebRequest webRequest;
 
-    public MonitorScheduler(AppRunningRepository runningRepository,
+    public MonitorScheduler(NotifyService notifyService, AppRunningRepository runningRepository,
                             MachineStatusCrudService statusCrudService) throws SchedulerException {
         this.scheduler = StdSchedulerFactory.getDefaultScheduler();
+        this.notifyService = notifyService;
         this.runningRepository = runningRepository;
         this.statusCrudService = statusCrudService;
         this.webRequest = new DefaultWebRequest();
@@ -48,6 +51,7 @@ public class MonitorScheduler {
 
     public void addAppHealthCheckJob(String jobId, String cronExp) throws SchedulerException {
         JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put("notifyService", notifyService);
         jobDataMap.put("appId", "");
         jobDataMap.put("machineId", "");
         jobDataMap.put("webRequest", webRequest);
@@ -57,6 +61,7 @@ public class MonitorScheduler {
 
     public void addMachineStatusMonitorJob(String jobId, String cronExp) throws SchedulerException {
         JobDataMap jobDataMap = new JobDataMap();
+        jobDataMap.put("notifyService", notifyService);
         jobDataMap.put("machineId", "");
         jobDataMap.put("statusCrudService", statusCrudService);
         add(MachineStatusMonitorJob.class, jobId, cronExp, jobDataMap);

+ 7 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/job/AppHealthCheckJob.java

@@ -3,6 +3,8 @@ package cn.reghao.autodop.dmaster.monitor.service.job;
 import cn.reghao.autodop.common.http.WebRequest;
 import cn.reghao.autodop.dmaster.app.entity.AppRunning;
 import cn.reghao.autodop.dmaster.app.repository.AppRunningRepository;
+import cn.reghao.autodop.dmaster.notification.entity.NotifyGroup;
+import cn.reghao.autodop.dmaster.notification.service.NotifyService;
 import lombok.extern.slf4j.Slf4j;
 import org.quartz.Job;
 import org.quartz.JobDataMap;
@@ -25,6 +27,9 @@ public class AppHealthCheckJob implements Job {
         JobDetail jobDetail = context.getJobDetail();
         JobDataMap jobDataMap = jobDetail.getJobDataMap();
 
+        NotifyService notifyService = (NotifyService) jobDataMap.get("notifyService");
+        NotifyGroup notifyGroup = (NotifyGroup) jobDataMap.get("notifyGroup");
+
         WebRequest webRequest = (WebRequest) jobDataMap.get("webRequest");
         AppRunningRepository runningRepository = (AppRunningRepository) jobDataMap.get("runningRepository");
 
@@ -46,7 +51,8 @@ public class AppHealthCheckJob implements Job {
             String url = "http://" + machineIpv4 + ":" + httpPort + healthCheck;
             int statusCode = webRequest.head(url);
             if (statusCode != 200) {
-                log.error("{} 机器上的 {} 应用健康检查失败", machineIpv4, appId);
+                String msg = String.format("%s 机器上的 %s 应用健康检查失败", machineIpv4, appId);
+                notifyService.notify(notifyGroup, msg);
             } else {
                 appRunning.setLastCheck(LocalDateTime.now());
                 runningRepository.save(appRunning);

+ 1 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/monitor/service/job/MachineStatusMonitorJob.java

@@ -21,6 +21,7 @@ public class MachineStatusMonitorJob implements Job {
     public void execute(JobExecutionContext context) {
         JobDetail jobDetail = context.getJobDetail();
         JobDataMap jobDataMap = jobDetail.getJobDataMap();
+
         MachineStatusCrudService statusCrudService = (MachineStatusCrudService) jobDataMap.get("statusCrudService");
 
         String machineId = jobDataMap.getString("machineId");

+ 8 - 7
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/DagentMsgDispatcher.java

@@ -40,20 +40,21 @@ public class DagentMsgDispatcher implements IMqttMessageListener {
             }
 
             String msg = message.toString();
-            AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String type = mqAsyncMsg.getType();
-            String ops = mqAsyncMsg.getOps();
-            String payload = mqAsyncMsg.getPayload();
+            AsyncMsg asyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
+            String msgId = asyncMsg.getMsgId();
+            String type = asyncMsg.getType();
+            String ops = asyncMsg.getOps();
+            String payload = asyncMsg.getPayload();
 
             switch (MessageType.valueOf(type)) {
                 case dagentType:
-                    dagentOpsProcessor.process(ops, payload);
+                    dagentOpsProcessor.process(msgId, ops, payload);
                     break;
                 case machineType:
-                    machineOpsProcessor.process(ops, payload);
+                    machineOpsProcessor.process(msgId, ops, payload);
                     break;
                 case appType:
-                    appOpsProcessor.process(ops, payload);
+                    appOpsProcessor.process(msgId, ops, payload);
                     break;
                 default:
             }

+ 26 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/AppOpsProcessor.java

@@ -4,6 +4,7 @@ import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.DeployResult;
 import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.message.ops.AppOps;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.db.query.config.AppQuery;
 import cn.reghao.autodop.dmaster.app.entity.AppDeploying;
@@ -17,6 +18,8 @@ import cn.reghao.autodop.dmaster.app.repository.log.BuildLogRepository;
 import cn.reghao.autodop.dmaster.app.repository.log.DeployLogRepository;
 import cn.reghao.autodop.dmaster.app.service.bd.DeployNotifyMsg;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
+import cn.reghao.autodop.dmaster.notification.entity.NotifyGroup;
+import cn.reghao.autodop.dmaster.notification.entity.NotifyType;
 import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
 import cn.reghao.autodop.dmaster.notification.service.NotifyService;
 import com.google.gson.reflect.TypeToken;
@@ -24,6 +27,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
 import java.lang.reflect.Type;
+import java.util.List;
 import java.util.Optional;
 
 /**
@@ -41,23 +45,27 @@ public class AppOpsProcessor implements OpsProcessor {
     private AppRunningRepository runningRepository;
     private AppQuery appQuery;
     private NotifyService notifyService;
+    private DefaultMqttClient mqttClient;
 
     public AppOpsProcessor(BuildLogRepository buildLogRepository,
                            DeployLogRepository deployLogRepository,
                            AppDeployingRepository deployingRepository,
                            AppRunningRepository runningRepository,
                            AppQuery appQuery,
-                           NotifyService notifyService) {
+                           NotifyService notifyService,
+                           DefaultMqttClient mqttClient) {
         this.buildLogRepository = buildLogRepository;
         this.deployLogRepository = deployLogRepository;
         this.deployingRepository = deployingRepository;
         this.runningRepository = runningRepository;
         this.appQuery = appQuery;
         this.notifyService = notifyService;
+        this.mqttClient = mqttClient;
     }
 
     @Override
-    public void process(String ops, String payload) {
+    public void process(String msgId, String ops, String payload) {
+        mqttClient.removeRecord(msgId);
         switch (AppOps.valueOf(ops)) {
             case appDeployResult:
                 processAppDeployResult(payload);
@@ -120,9 +128,22 @@ public class AppOpsProcessor implements OpsProcessor {
 
     private void deployNotify(BuildLog buildLog, DeployResult deployResult) {
         AppOrchestration app = appQuery.findByAppId(buildLog.getAppId());
-        DingMsg dingMsg = new DeployNotifyMsg(buildLog, deployResult).dingMsg();
-        /*List<NotifyReceiver> notifyReceivers = app.getNotifyReceivers();
-        notifyReceivers.forEach(notifyReceiver -> notifyService.notify(notifyReceiver, dingMsg));*/
+        List<NotifyGroup> notifyGroups = app.getNotifyGroups();
+        notifyGroups.forEach(notifyGroup -> {
+            String notifyType = notifyGroup.getNotifyType();
+            switch (NotifyType.valueOf(notifyType)) {
+                case ding:
+                    DingMsg dingMsg = new DeployNotifyMsg(buildLog, deployResult).dingMsg();
+                    notifyService.notify(notifyGroup, dingMsg);
+                    break;
+                case email:
+                    break;
+                case sms:
+                    break;
+                default:
+                    log.error("通知类型不存在...");
+            }
+        });
     }
 
     private void processAppStatus(String payload) {

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/DagentOpsProcessor.java

@@ -50,7 +50,7 @@ public class DagentOpsProcessor implements OpsProcessor {
     }
 
     @Override
-    public void process(String ops, String payload) {
+    public void process(String msgId, String ops, String payload) {
         switch (DagentOps.valueOf(ops)) {
             case dagentStart:
                 processDagentStart(payload);

+ 9 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/MachineOpsProcessor.java

@@ -2,6 +2,7 @@ package cn.reghao.autodop.dmaster.mqttsub.processor;
 
 import cn.reghao.autodop.common.message.ops.MachineOps;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
+import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
@@ -14,8 +15,15 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class MachineOpsProcessor implements OpsProcessor {
+    private DefaultMqttClient mqttClient;
+
+    public MachineOpsProcessor(DefaultMqttClient mqttClient) {
+        this.mqttClient = mqttClient;
+    }
+
     @Override
-    public void process(String ops, String payload) {
+    public void process(String msgId, String ops, String payload) {
+        mqttClient.removeRecord(msgId);
         try {
             switch (MachineOps.valueOf(ops)) {
                 case machineStatResult:

+ 23 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/entity/NotifyGroup.java

@@ -27,10 +27,33 @@ public class NotifyGroup extends BaseEntity<Integer> {
     @ValidEnum(value = NotifyType.class, message = "请选择正确的通知类型")
     private String notifyType;
     private String notifyAccountId;
+    // TODO 同时会作为 email 的 subject
     @NotBlank(message = "接收组标识不能为空白字符串")
     private String groupId;
     // TODO 对 String 集合的内容进行校验
     @ElementCollection(targetClass = String.class)
     @LazyCollection(LazyCollectionOption.FALSE)
     private List<String> receivers;
+
+    @Override
+    public int hashCode() {
+        int result = 17;
+        result = result * 31 + notifyType.hashCode();
+        result = result * 31 + notifyAccountId.hashCode();
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (other == this) {
+            return true;
+        }
+
+        if (other instanceof NotifyGroup) {
+            NotifyGroup o = (NotifyGroup) other;
+            return o.notifyType.equals(notifyType) && o.notifyAccountId.equals(notifyAccountId);
+        } else {
+            return false;
+        }
+    }
 }

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/repository/EmailAccountRepository.java

@@ -8,5 +8,5 @@ import org.springframework.data.jpa.repository.JpaRepository;
  * @date 2021-05-24 15:20:24
  */
 public interface EmailAccountRepository extends JpaRepository<EmailAccount, Integer> {
-    EmailAccount findEmailAccountByIsDefaultTrue();
+    EmailAccount findEmailAccountByNotifyAccountId(String notifyAccountId);
 }

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/repository/SmsAccountRepository.java

@@ -8,5 +8,5 @@ import org.springframework.data.jpa.repository.JpaRepository;
  * @date 2021-05-24 15:20:24
  */
 public interface SmsAccountRepository extends JpaRepository<SmsAccount, Integer> {
-    SmsAccount findSmsAccountByIsDefaultTrue();
+    SmsAccount findSmsAccountByNotifyAccountId(String notifyAccountId);
 }

+ 109 - 23
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/NotifyService.java

@@ -2,10 +2,13 @@ package cn.reghao.autodop.dmaster.notification.service;
 
 import cn.reghao.autodop.common.http.DefaultWebRequest;
 import cn.reghao.autodop.common.http.WebRequest;
+import cn.reghao.autodop.dmaster.notification.entity.EmailAccount;
 import cn.reghao.autodop.dmaster.notification.entity.NotifyType;
 import cn.reghao.autodop.dmaster.notification.entity.NotifyGroup;
 import cn.reghao.autodop.dmaster.common.thread.ThreadPoolWrapper;
+import cn.reghao.autodop.dmaster.notification.entity.SmsAccount;
 import cn.reghao.autodop.dmaster.notification.repository.EmailAccountRepository;
+import cn.reghao.autodop.dmaster.notification.repository.NotifyGroupRepository;
 import cn.reghao.autodop.dmaster.notification.repository.SmsAccountRepository;
 import cn.reghao.autodop.dmaster.notification.service.notifier.Notify;
 import cn.reghao.autodop.dmaster.notification.service.notifier.ding.DingMsg;
@@ -16,6 +19,7 @@ import cn.reghao.autodop.dmaster.notification.service.notifier.sms.SmsNotify;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.PostConstruct;
 import java.io.UnsupportedEncodingException;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,46 +34,128 @@ import java.util.concurrent.ExecutorService;
 @Slf4j
 @Service
 public class NotifyService {
-    private Map<String, Notify> notifyMap = new HashMap<>();
     private ExecutorService threadPool = ThreadPoolWrapper.threadPool("notify-service");
-    private DingNotify dingNotify;
-    private SmsNotify smsNotify;
-    private EmailNotify emailNotify;
+    private Map<NotifyGroup, Notify> notifierMap = new HashMap<>();
+    private WebRequest webRequest;
+    private NotifyGroupRepository groupRepository;
+    private EmailAccountRepository emailRepository;
+    private SmsAccountRepository smsRepository;
 
-    public NotifyService(SmsAccountRepository smsRepository, EmailAccountRepository emailRepository)
-            throws UnsupportedEncodingException {
-        WebRequest webRequest = new DefaultWebRequest();
-        this.dingNotify = new DingNotify(webRequest);
-        /*this.smsNotify = new SmsNotify(webRequest, new SmsAccount());
-        this.emailNotify = new EmailNotify(new EmailAccount());*/
+    public NotifyService(NotifyGroupRepository groupRepository,
+                         EmailAccountRepository emailRepository,
+                         SmsAccountRepository smsRepository) {
+        this.webRequest = new DefaultWebRequest();
+        this.groupRepository = groupRepository;
+        this.emailRepository = emailRepository;
+        this.smsRepository = smsRepository;
     }
 
-    public void notify(NotifyGroup notifyGroup, Object msg) {
-        // TODO 调用 NotifyType.valueOf() 没有指定类型时会抛出异常
-        switch (NotifyType.valueOf(notifyGroup.getNotifyType())) {
+    /**
+     * TODO 系统启动时初始化所有的通知组,这应该推迟到通知组第一次使用时才初始化
+     *
+     * @param
+     * @return
+     * @date 2021-06-23 下午9:47
+     */
+    @PostConstruct
+    private void initNotifier() throws UnsupportedEncodingException {
+        for (NotifyGroup group : groupRepository.findAll()) {
+            addNotifier(group);
+        }
+    }
+
+    public void addNotifier(NotifyGroup notifyGroup) throws UnsupportedEncodingException {
+        Notify notify = notifierMap.get(notifyGroup);
+        if (notify != null) {
+            return;
+        }
+
+        String notifyType = notifyGroup.getNotifyType();
+        String notifyAccountId = notifyGroup.getNotifyAccountId();
+        switch (NotifyType.valueOf(notifyType)) {
             case ding:
-                if (msg instanceof DingMsg) {
-                    DingMsg dingMsg = (DingMsg) msg;
-                    //threadPool.execute(new NotifyTask<>(dingNotify, notifyReceiver.getReceiver(), dingMsg));
+                notifierMap.put(notifyGroup, new DingNotify(webRequest));
+                break;
+            case email:
+                EmailAccount emailAccount = emailRepository.findEmailAccountByNotifyAccountId(notifyAccountId);
+                if (emailAccount != null) {
+                    notifierMap.put(notifyGroup, new EmailNotify(emailAccount));
                 }
                 break;
+            case sms:
+                SmsAccount smsAccount = smsRepository.findSmsAccountByNotifyAccountId(notifyAccountId);
+                if (smsAccount != null) {
+                    notifierMap.put(notifyGroup, new SmsNotify(webRequest, smsAccount));
+                }
+                break;
+            default:
+                log.error("通知类型不存在...");
+        }
+    }
+
+    public <T> void notify(NotifyGroup notifyGroup, T msg) {
+        Notify notify = notifierMap.get(notifyGroup);
+        if (notify == null) {
+            log.error("类型为 {}, 账户为 {} 的通知发送器不存在...",
+                    notifyGroup.getNotifyType(), notifyGroup.getNotifyAccountId());
+            return;
+        }
+
+        String notifyType = notifyGroup.getNotifyType();
+        switch (NotifyType.valueOf(notifyType)) {
+            case ding:
+                if (!(msg instanceof DingMsg)) {
+                    log.error("{} 消息格式不正确, 不是 DingMsg...", msg);
+                }
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, msg)));
             case email:
-                if (msg instanceof EmailMsg) {
-                    EmailMsg emailMsg = (EmailMsg) msg;
-                    //threadPool.execute(new NotifyTask<>(emailNotify, notifyReceiver.getReceiver(), emailMsg));
+                if (!(msg instanceof EmailMsg)) {
+                    log.error("{} 消息格式不正确, 不是 EmailMsg...", msg);
                 }
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, msg)));
                 break;
             case sms:
-                if (msg instanceof String) {
-                    String smsMsg = (String) msg;
-                    //threadPool.execute(new NotifyTask<>(smsNotify, notifyReceiver.getReceiver(), smsMsg));
+                if (!(msg instanceof String)) {
+                    log.error("{} 消息格式不正确, 不是 SMS 消息...", msg);
                 }
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, msg)));
                 break;
             default:
                 log.error("通知类型不存在...");
         }
     }
 
+    /*public void notify(NotifyGroup notifyGroup, String msg) {
+        Notify notify = notifierMap.get(notifyGroup);
+        if (notify == null) {
+            log.error("类型为 {}, 账户为 {} 的通知发送器不存在...",
+                    notifyGroup.getNotifyType(), notifyGroup.getNotifyAccountId());
+            return;
+        }
+
+        String notifyType = notifyGroup.getNotifyType();
+        switch (NotifyType.valueOf(notifyType)) {
+            case ding:
+                DingMsg dingMsg = new DingMsg(msg);
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, dingMsg)));
+            case email:
+                EmailMsg emailMsg = new EmailMsg(notifyGroup.getGroupId(), msg);
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, emailMsg)));
+                break;
+            case sms:
+                notifyGroup.getReceivers()
+                        .forEach(receiver -> threadPool.execute(new NotifyTask(notify, receiver, msg)));
+                break;
+            default:
+                log.error("通知类型不存在...");
+        }
+    }*/
+
     /**
      * TODO 添加通知日志
      *
@@ -93,7 +179,7 @@ public class NotifyService {
             try {
                 notify.send(receiver, msg);
             } catch (Exception e) {
-                e.printStackTrace();
+                log.error("发送给 {} 的通知失败 -> {}", receiver, e.getMessage());
             }
         }
     }

+ 18 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/NotifyMsg.java

@@ -0,0 +1,18 @@
+package cn.reghao.autodop.dmaster.notification.service.notifier;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * 通知消息
+ *
+ * @author reghao
+ * @date 2021-06-24 08:56:58
+ */
+@NoArgsConstructor
+@AllArgsConstructor
+@Data
+public class NotifyMsg {
+    private String content;
+}

+ 12 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/sms/SmsMsg.java

@@ -0,0 +1,12 @@
+package cn.reghao.autodop.dmaster.notification.service.notifier.sms;
+
+import lombok.Data;
+
+/**
+ * @author reghao
+ * @date 2021-06-24 09:08:24
+ */
+@Data
+public class SmsMsg {
+    private String content;
+}

+ 3 - 3
dmaster/src/main/java/cn/reghao/autodop/dmaster/notification/service/notifier/sms/SmsNotify.java

@@ -19,7 +19,7 @@ import java.util.Map;
  * @date 2020-05-07 01:30:43
  */
 @Slf4j
-public class SmsNotify implements Notify<String> {
+public class SmsNotify implements Notify<SmsMsg> {
     private WebRequest webRequest;
     private Map<String, String> params = new HashMap<>();
 
@@ -39,9 +39,9 @@ public class SmsNotify implements Notify<String> {
      * @date 2021-06-23 上午11:50
      */
     @Override
-    public synchronized void send(String receiver, String msg) throws Exception {
+    public synchronized void send(String receiver, SmsMsg msg) throws Exception {
         params.put("sdst", receiver);
-        params.put("smsg", msg);
+        params.put("smsg", msg.getContent());
         String url = params.remove("url");
 
         WebResponse webResponse = webRequest.postFormData(url, params);

+ 1 - 1
dmaster/src/main/resources/templates/monitor/machine.html

@@ -55,7 +55,7 @@
                     <td th:text="${item.machineId}">机器 ID</td>
                     <td th:text="${item.machineIpv4}">机器地址</td>
                     <td>
-                        <a class="open-popup" data-title="机器当前状态" th:attr="data-url=@{'/machine/host/status/'+${item.machineId}}"
+                        <a class="open-popup" data-title="监控任务设置" th:attr="data-url=@{'/machine/host/status/'+${item.machineId}}"
                            data-size="1200,600" href="#">启用监控</a>
                     </td>
                 </tr>