Prechádzať zdrojové kódy

剔除 rabbitmq

其他项目若需要 rabbitmq 模块,在这个版本的前一个版本查找
reghao 4 rokov pred
rodič
commit
0fd976de66
29 zmenil súbory, kde vykonal 97 pridanie a 864 odobranie
  1. 0 80
      common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java
  2. 0 22
      common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProperties.java
  3. 0 33
      common/src/main/java/cn/reghao/autodop/common/amqp/RpcResult.java
  4. 9 20
      common/src/main/java/cn/reghao/autodop/common/dagent/machine/Machine.java
  5. 0 18
      common/src/main/java/cn/reghao/autodop/common/dagent/machine/MachineService.java
  6. 1 0
      common/src/main/java/cn/reghao/autodop/common/message/AsyncMsg.java
  7. 10 9
      common/src/main/java/cn/reghao/autodop/common/message/CallResult.java
  8. 12 0
      common/src/main/java/cn/reghao/autodop/common/utils/serializer/JsonConverter.java
  9. 5 5
      dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java
  10. 14 22
      dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/AppOpsDispatcher.java
  11. 7 8
      dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/DmasterMessageDispatcher.java
  12. 18 6
      dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/MachineOpsDispatcher.java
  13. 1 2
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/HeartbeatJob.java
  14. 0 1
      dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/MachineScheduler.java
  15. 1 0
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java
  16. 0 103
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumer.java
  17. 0 59
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumerConfig.java
  18. 1 32
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggerConfig.java
  19. 0 87
      dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggingInitializer.java
  20. 1 19
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java
  21. 0 131
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/RemoteCallService.java
  22. 1 35
      dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java
  23. 2 0
      dmaster/src/main/java/cn/reghao/autodop/dmaster/machine/entity/MachineLog.java
  24. 0 67
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumer.java
  25. 6 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/AppOpsDispatcher.java
  26. 1 5
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/DagentMessageDispatcher.java
  27. 7 1
      dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/MachineOpsDispatcher.java
  28. 0 71
      dmaster/src/test/java/cn/reghao/autodop/common/amqp/RabbitProducerTest.java
  29. 0 23
      pom.xml

+ 0 - 80
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProducer.java

@@ -1,80 +0,0 @@
-package cn.reghao.autodop.common.amqp;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.MessageDeliveryMode;
-import org.springframework.amqp.rabbit.connection.CorrelationData;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.stereotype.Component;
-
-import java.util.UUID;
-
-/**
- * MQ 消息生产者
- *
- * @author reghao
- * @date 2020-12-30 10:21:23
- */
-@Slf4j
-@Component
-public class RabbitProducer {
-    private RabbitTemplate rabbitTemplate;
-
-    public RabbitProducer(RabbitTemplate rabbitTemplate) {
-        this.rabbitTemplate = rabbitTemplate;
-        config();
-    }
-
-    private void config() {
-        // 开启可靠投递
-        rabbitTemplate.setConfirmCallback((data, ack, cause) -> {
-            if (!ack) {
-                log.error("消息发送失败 -> {}", cause + (data != null ? data.toString() : null));
-            } else {
-                log.info("消息发送成功 -> {}", cause + (data != null ? data.getId() : null));
-            }
-        });
-
-        rabbitTemplate.setMandatory(true);
-        rabbitTemplate.setReturnCallback(((message, replyCode, replyText, exchange, routingKey) ->
-                log.info("消息发送 ReturnCallback: {}, {}, {}, {}, {}",
-                        message, replyCode, replyText, exchange, routingKey)));
-    }
-
-    /**
-     * 发送消息
-     *
-     * @param
-     * @return
-     * @date 2021-02-23 下午3:28
-     */
-    public void send(String exchange, String routeKey, AsyncMsg mqAsyncMsg) {
-        String msg = JsonConverter.objectToJson(mqAsyncMsg);
-        // 默认的 exchange 是 amq.direct
-        rabbitTemplate.convertAndSend(exchange, routeKey, msg, message -> {
-            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
-            return message;
-        }, new CorrelationData(UUID.randomUUID().toString()));
-    }
-
-    /**
-     * RPC 调用
-     *
-     * @param
-     * @return
-     * @date 2021-02-23 下午3:57
-     */
-    public RpcResult callRemote(String exchange, String routeKey, AsyncMsg asyncMsg) {
-        // TODO 处理 RPC 调用的写操作在超时后才返回的情况
-        long timeout = 180_000;
-        rabbitTemplate.setReplyTimeout(timeout);
-        String msg = JsonConverter.objectToJson(asyncMsg);
-        String result = (String) rabbitTemplate.convertSendAndReceive(exchange, routeKey, msg);
-        if (result == null) {
-            return RpcResult.fail("RPC 调用等待 " + timeout/1000 + "s 后超时");
-        } else {
-            return JsonConverter.jsonToObject(result, RpcResult.class);
-        }
-    }
-}

+ 0 - 22
common/src/main/java/cn/reghao/autodop/common/amqp/RabbitProperties.java

@@ -1,22 +0,0 @@
-package cn.reghao.autodop.common.amqp;
-
-import lombok.Data;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-import org.springframework.stereotype.Component;
-
-/**
- * @author reghao
- * @date 2020-09-04 10:57:56
- */
-/*@Data
-@Component
-@ConfigurationProperties(prefix = "rabbit")*/
-public class RabbitProperties {
-    /*@Value("${rabbit.exchange}")
-    private String exchange;
-    @Value("${rabbit.routeKey}")
-    private String routeKey;
-    @Value("${rabbit.queue}")
-    private String queue;*/
-}

+ 0 - 33
common/src/main/java/cn/reghao/autodop/common/amqp/RpcResult.java

@@ -1,33 +0,0 @@
-package cn.reghao.autodop.common.amqp;
-
-import lombok.AllArgsConstructor;
-import lombok.Data;
-
-import static cn.reghao.autodop.common.result.ResultCode.FAIL;
-import static cn.reghao.autodop.common.result.ResultCode.SUCCESS;
-
-/**
- * RPC 调用结果
- *
- * @author reghao
- * @date 2020-09-07 23:04:16
- */
-@Data
-@AllArgsConstructor
-public class RpcResult {
-    // 0 - 成功 1 - 失败 -1 - 超时
-    private Integer statusCode;
-    private String result;
-
-    public static RpcResult success(String data) {
-        return new RpcResult(SUCCESS.getCode(), data);
-    }
-
-    public static RpcResult fail(String data) {
-        return new RpcResult(FAIL.getCode(), data);
-    }
-
-    public static RpcResult error(String data) {
-        return new RpcResult(FAIL.getCode(), data);
-    }
-}

+ 9 - 20
common/src/main/java/cn/reghao/autodop/common/dagent/machine/Machine.java

@@ -1,15 +1,13 @@
 package cn.reghao.autodop.common.dagent.machine;
 
-import cn.reghao.autodop.common.amqp.RpcResult;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineHeartbeat;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineLog;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineInfo;
+import cn.reghao.autodop.common.dagent.machine.api.data.*;
 import cn.reghao.autodop.common.dagent.machine.hardware.cpu.CPU;
 import cn.reghao.autodop.common.dagent.machine.hardware.disk.Disk;
 import cn.reghao.autodop.common.dagent.machine.hardware.memory.Memory;
 import cn.reghao.autodop.common.dagent.machine.hardware.network.Network;
 import cn.reghao.autodop.common.dagent.machine.system.os.OS;
-import cn.reghao.autodop.common.utils.text.TextFile;
+import cn.reghao.autodop.common.message.CallResult;
+import cn.reghao.autodop.common.utils.MachineId;
 import org.springframework.stereotype.Component;
 
 /**
@@ -17,18 +15,14 @@ import org.springframework.stereotype.Component;
  * @date 2020-10-22 15:47:58
  */
 @Component
-public class Machine implements MachineService {
-    private final static String machineId;
+public class Machine {
+    private final static String machineId = MachineId.id();
     private CPU cpu;
     private Memory memory;
     private Disk disk;
     private Network network;
     private OS os;
 
-    static {
-        machineId = new TextFile().readFile("/etc/machine-id");
-    }
-
     public Machine(CPU cpu, Memory memory, Disk disk, Network network, OS os) {
         this.cpu = cpu;
         this.memory = memory;
@@ -41,7 +35,6 @@ public class Machine implements MachineService {
         return machineId;
     }
 
-    @Override
     public MachineInfo registry() {
         MachineInfo machineInfo = new MachineInfo();
         machineInfo.setMachineId(machineId);
@@ -53,7 +46,6 @@ public class Machine implements MachineService {
         return machineInfo;
     }
 
-    @Override
     public MachineHeartbeat heartbeat() {
         MachineHeartbeat machineHeartbeat = new MachineHeartbeat();
         machineHeartbeat.setMachineId(machineId);
@@ -62,18 +54,15 @@ public class Machine implements MachineService {
         return machineHeartbeat;
     }
 
-    @Override
     public MachineLog log() {
         return null;
     }
 
-    @Override
-    public RpcResult shell(String script) {
-        return RpcResult.success("");
+    public CallResult<MachineShell> shell(String script) {
+        return CallResult.success(null);
     }
 
-    @Override
-    public RpcResult state() {
-        return RpcResult.success("");
+    public CallResult<MachineState> state() {
+        return CallResult.success(null);
     }
 }

+ 0 - 18
common/src/main/java/cn/reghao/autodop/common/dagent/machine/MachineService.java

@@ -1,18 +0,0 @@
-package cn.reghao.autodop.common.dagent.machine;
-
-import cn.reghao.autodop.common.amqp.RpcResult;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineHeartbeat;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineLog;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineInfo;
-
-/**
- * @author reghao
- * @date 2021-02-22 17:25:23
- */
-public interface MachineService {
-    MachineInfo registry();
-    MachineHeartbeat heartbeat();
-    MachineLog log();
-    RpcResult shell(String payload);
-    RpcResult state();
-}

+ 1 - 0
common/src/main/java/cn/reghao/autodop/common/message/AsyncMsg.java

@@ -5,6 +5,7 @@ import lombok.Data;
 
 /**
  * 异步消息
+ * 执行两次序列化,对 payload 执行一次序列化,然后对 AsyncMsg 执行一次序列化
  *
  * @author reghao
  * @date 2020-12-25 17:49:26

+ 10 - 9
common/src/main/java/cn/reghao/autodop/common/message/CallResult.java

@@ -13,20 +13,21 @@ import static cn.reghao.autodop.common.result.ResultCode.*;
  */
 @Data
 @AllArgsConstructor
-public class CallResult {
+public class CallResult<T> {
     // 0 - 成功 1 - 失败
-    private Integer code;
-    private Object data;
+    private int code;
+    private String msg;
+    private T data;
 
-    public static CallResult success(Object data) {
-        return new CallResult(SUCCESS.getCode(), data);
+    public static <T> CallResult<T> success(T data) {
+        return new CallResult<>(SUCCESS.getCode(), SUCCESS.getMsg(), data);
     }
 
-    public static CallResult fail(Object data) {
-        return new CallResult(FAIL.getCode(), data);
+    public static <T> CallResult<T> fail(String msg) {
+        return new CallResult<>(FAIL.getCode(), msg, null);
     }
 
-    public static CallResult error(Object data) {
-        return new CallResult(ERROR.getCode(), data);
+    public static <T>CallResult<T> error(String msg) {
+        return new CallResult<>(ERROR.getCode(), msg, null);
     }
 }

+ 12 - 0
common/src/main/java/cn/reghao/autodop/common/utils/serializer/JsonConverter.java

@@ -4,6 +4,7 @@ import cn.reghao.autodop.common.utils.text.TextFile;
 import com.google.gson.*;
 
 import java.io.File;
+import java.lang.reflect.Type;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
 import java.util.List;
@@ -42,6 +43,17 @@ public class JsonConverter {
         return gson.fromJson(json, clazz);
     }
 
+    /**
+     * 对泛型数据的反序列化
+     *
+     * @param
+     * @return
+     * @date 2021-05-26 下午3:38
+     */
+    public static <T> T jsonToObject(String json, Type type) {
+        return gson.fromJson(json, type);
+    }
+
     public static <T> List<T> jsonToObjects(String json, Class<T> clazz) {
         JsonParser parser = new JsonParser();
         List<T> list = new ArrayList<>();

+ 5 - 5
dagent/src/main/java/cn/reghao/autodop/dagent/app/App.java

@@ -31,7 +31,7 @@ public class App {
         this.zipAppServiceImpl = new ZipAppServiceImpl();
     }
 
-    public CallResult deploy(String payload) {
+    public CallResult<AppStatus> deploy(String payload) {
         AppDeployArgs appDeployArgs = JsonConverter.jsonToObject(payload, AppDeployArgs.class);
         String packerType = appDeployArgs.getPackerType();
 
@@ -84,7 +84,7 @@ public class App {
         }
     }
 
-    public CallResult status(String payload) {
+    public CallResult<AppStatus> status(String payload) {
         AppIdArgs appIdArgs = JsonConverter.jsonToObject(payload, AppIdArgs.class);
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
@@ -107,7 +107,7 @@ public class App {
         return CallResult.success(appStatus);
     }
 
-    public CallResult restart(String payload) {
+    public CallResult<AppStatus> restart(String payload) {
         AppIdArgs appIdArgs = JsonConverter.jsonToObject(payload, AppIdArgs.class);
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
@@ -130,7 +130,7 @@ public class App {
         return CallResult.success(appStatus);
     }
 
-    public CallResult stop(String payload) {
+    public CallResult<AppStatus> stop(String payload) {
         AppIdArgs appIdArgs = JsonConverter.jsonToObject(payload, AppIdArgs.class);
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();
@@ -153,7 +153,7 @@ public class App {
         return CallResult.success(appStatus);
     }
 
-    public CallResult start(String payload) {
+    public CallResult<AppStatus> start(String payload) {
         AppIdArgs appIdArgs = JsonConverter.jsonToObject(payload, AppIdArgs.class);
         String appId = appIdArgs.getAppId();
         String packerType = appIdArgs.getPackerType();

+ 14 - 22
dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/AppOpsDispatcher.java

@@ -30,44 +30,36 @@ public class AppOpsDispatcher {
 
     public void dispatch(String ops, String payload) throws MqttException {
         AsyncMsg asyncMsg;
-        CallResult callResult;
         switch (AppOps.valueOf(ops)) {
             case appDeploy:
-                log.info("部署应用...");
-                callResult = app.deploy(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appDeploy.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appDeployResult.name(),
+                        JsonConverter.objectToJson(app.deploy(payload)));
+                log.info("部署应用完成...");
                 break;
             case appStatus:
-                callResult = app.status(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStatus.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStatusResult.name(),
+                        JsonConverter.objectToJson(app.status(payload)));
                 break;
             case appRestart:
-                callResult = app.restart(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appRestart.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appRestartResult.name(),
+                        JsonConverter.objectToJson(app.restart(payload)));
                 break;
             case appStop:
-                callResult = app.stop(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStop.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStopResult.name(),
+                        JsonConverter.objectToJson(app.stop(payload)));
                 break;
             case appStart:
-                callResult = app.start(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStart.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appStartResult.name(),
+                        JsonConverter.objectToJson(app.start(payload)));
                 break;
             case appLog:
-                callResult = app.log(payload);
-                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appLog.name(),
-                        JsonConverter.objectToJson(callResult));
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appLogResult.name(),
+                        JsonConverter.objectToJson(app.log(payload)));
                 break;
             default:
                 log.error("AppOps 中没有相应类型...");
-                callResult = CallResult.error("AppOps 中没有相应类型...");
                 asyncMsg = AsyncMsg.asyncMsg(MessageType.appType.name(), AppOps.appDeployResult.name(),
-                        JsonConverter.objectToJson(callResult));
+                        JsonConverter.objectToJson(CallResult.error("AppOps 中没有相应类型...")));
                 break;
         }
 

+ 7 - 8
dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/DmasterMessageDispatcher.java

@@ -2,7 +2,7 @@ package cn.reghao.autodop.dagent.dispatcher;
 
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.dagent.machine.Machine;
+import cn.reghao.autodop.common.utils.MachineId;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
@@ -21,11 +21,13 @@ import java.lang.management.ManagementFactory;
 @Slf4j
 @Component
 public class DmasterMessageDispatcher implements MqttCallback {
+    private String machineId;
     private long startTime;
     private MachineOpsDispatcher machineOpsDispatcher;
     private AppOpsDispatcher appOpsDispatcher;
 
     public DmasterMessageDispatcher(MachineOpsDispatcher machineOpsDispatcher, AppOpsDispatcher appOpsDispatcher) {
+        this.machineId = MachineId.id();
         this.startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
         this.machineOpsDispatcher = machineOpsDispatcher;
         this.appOpsDispatcher = appOpsDispatcher;
@@ -41,25 +43,22 @@ public class DmasterMessageDispatcher implements MqttCallback {
         String msg = message.toString();
         try {
             AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String machineId = mqAsyncMsg.getMachineId();
-            long sendTime = mqAsyncMsg.getSendTime();
             String type = mqAsyncMsg.getType();
             String ops = mqAsyncMsg.getOps();
             String payload = mqAsyncMsg.getPayload();
 
-            // TODO 使用 machineId 作为 routeKey 来确定 dagent 节点
-            if (!machineId.equals(Machine.machineId()) || sendTime < startTime) {
-                log.info("忽略不是发送到本机或 dagent 启动前发送到 MQ 中的所有消息...");
+            if (!mqAsyncMsg.getMachineId().equals(machineId) || mqAsyncMsg.getSendTime() < startTime) {
+                log.info("忽略不是发送到本节点或节点启动前发送的消息...");
                 return;
             }
 
             switch (MessageType.valueOf(type)) {
                 case machineType:
-                    log.info("发送给 machine 的消息...");
+                    log.info("执行 machine 相关操作...");
                     machineOpsDispatcher.dispatch(ops, payload);
                     break;
                 case appType:
-                    log.info("发送给 app 的消息...");
+                    log.info("执行 app 相关操作...");
                     appOpsDispatcher.dispatch(ops, payload);
                     break;
                 default:

+ 18 - 6
dagent/src/main/java/cn/reghao/autodop/dagent/dispatcher/MachineOpsDispatcher.java

@@ -1,10 +1,11 @@
 package cn.reghao.autodop.dagent.dispatcher;
 
-import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.message.MachineOps;
+import cn.reghao.autodop.common.message.*;
 import cn.reghao.autodop.common.mqtt.MqttPub;
+import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
+import org.eclipse.paho.client.mqttv3.MqttException;
 import org.springframework.stereotype.Component;
 
 /**
@@ -15,24 +16,35 @@ import org.springframework.stereotype.Component;
 @Component
 public class MachineOpsDispatcher {
     private MqttPub mqttPub;
+    private String topic;
     private Machine machine;
 
     public MachineOpsDispatcher(MqttPub mqttPub, Machine machine) {
         this.mqttPub = mqttPub;
+        this.topic = "dmaster";
         this.machine = machine;
     }
 
-    public void dispatch(String ops, String payload) {
+    public void dispatch(String ops, String payload) throws MqttException {
+        AsyncMsg asyncMsg;
         switch (MachineOps.valueOf(ops)) {
             case machineState:
                 log.info("机器状态...");
-                machine.state();
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.machineType.name(), MachineOps.machineStateResult.name(),
+                        JsonConverter.objectToJson(machine.state()));
+                break;
             case machineShell:
                 log.info("执行脚本...");
-                machine.shell(payload);
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.machineType.name(), MachineOps.machineShellResult.name(),
+                        JsonConverter.objectToJson(machine.shell(payload)));
+                break;
             default:
                 log.error("MachineOps 中没有相应类型...");
-                RpcResult.error("MachineOps 中没有相应类型...");
+                asyncMsg = AsyncMsg.asyncMsg(MessageType.machineType.name(), MachineOps.machineShellResult.name(),
+                        JsonConverter.objectToJson(CallResult.error("MachineOps 中没有相应类型...")));
+                break;
         }
+
+        mqttPub.pub(topic, JsonConverter.objectToJson(asyncMsg));
     }
 }

+ 1 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/HeartbeatJob.java

@@ -2,7 +2,6 @@ package cn.reghao.autodop.dagent.machine.timer;
 
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.message.MachineOps;
 import cn.reghao.autodop.common.mqtt.MqttPub;
@@ -20,7 +19,7 @@ import org.quartz.*;
 @Slf4j
 public class HeartbeatJob implements Job {
     @Override
-    public void execute(JobExecutionContext context) throws JobExecutionException {
+    public void execute(JobExecutionContext context) {
         JobKey jobKey = context.getJobDetail().getKey();
         JobDataMap jobDataMap = context.getJobDetail().getJobDataMap();
 

+ 0 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/machine/timer/MachineScheduler.java

@@ -1,6 +1,5 @@
 package cn.reghao.autodop.dagent.machine.timer;
 
-import cn.reghao.autodop.common.amqp.RabbitProducer;
 import cn.reghao.autodop.common.dagent.machine.Machine;
 import cn.reghao.autodop.common.mqtt.MqttPub;
 import lombok.extern.slf4j.Slf4j;

+ 1 - 0
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -86,5 +86,6 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
 
     private void initLogger() {
         LoggerConfig loggerConfig = new LoggerConfig();
+        loggerConfig.initialize();
     }
 }

+ 0 - 103
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumer.java

@@ -1,103 +0,0 @@
-package cn.reghao.autodop.dagent.utils.amqp;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.amqp.RpcResult;
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import cn.reghao.autodop.dagent.dispatcher.AppOpsDispatcher;
-import cn.reghao.autodop.dagent.dispatcher.MachineOpsDispatcher;
-import com.rabbitmq.client.Channel;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.core.MessageProperties;
-import org.springframework.amqp.rabbit.core.RabbitTemplate;
-import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
-
-import java.lang.management.ManagementFactory;
-
-/**
- * dagent 消息分发
- *
- * @author reghao
- * @date 2020-09-04 11:00:22
- */
-@Slf4j
-//@Component
-public class DagentConsumer implements ChannelAwareMessageListener {
-    private RabbitTemplate rabbitTemplate;
-    private MachineOpsDispatcher machineOpsDispatcher;
-    private AppOpsDispatcher appOpsDispatcher;
-
-    public DagentConsumer(RabbitTemplate rabbitTemplate,
-                          MachineOpsDispatcher machineOpsDispatcher,
-                          AppOpsDispatcher appOpsDispatcher) {
-        this.rabbitTemplate = rabbitTemplate;
-        this.machineOpsDispatcher = machineOpsDispatcher;
-        this.appOpsDispatcher = appOpsDispatcher;
-    }
-
-    @Override
-    public void onMessage(org.springframework.amqp.core.Message msg, Channel channel) {
-        MessageProperties messageProperties = msg.getMessageProperties();
-        long startTime = ManagementFactory.getRuntimeMXBean().getStartTime();
-        String body = new String(msg.getBody());
-        try {
-            AsyncMsg asyncMsg = JsonConverter.jsonToObject(body, AsyncMsg.class);
-            String machineId = asyncMsg.getMachineId();
-            long sendTime = asyncMsg.getSendTime();
-            boolean isRpc = true;
-            String type = asyncMsg.getType();
-            String ops = asyncMsg.getOps();
-            String payload = asyncMsg.getPayload();
-
-            // TODO 使用 machineId 作为 routeKey 来确定 dagent 节点
-            if (!machineId.equals(Machine.machineId()) || sendTime < startTime) {
-                log.info("忽略不是发送到本机或 dagent 启动前发送到 MQ 中的所有消息...");
-                return;
-            }
-
-            log.info("新消息...");
-            switch (MessageType.valueOf(type)) {
-                case machineType:
-                    if (isRpc) {
-                        /*RpcResult rpcResult = machineDispatcher.rpcDispatch(ops, payload);
-                        rpcReply(messageProperties, rpcResult);*/
-                    } else {
-                        machineOpsDispatcher.dispatch(ops, payload);
-                        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
-                    }
-                    break;
-                case appType:
-                    if (isRpc) {
-                        /*RpcResult rpcResult = appDispatcher.rpcDispatch(ops, payload);
-                        rpcReply(messageProperties, rpcResult);*/
-                    } else {
-                        appOpsDispatcher.dispatch(ops, payload);
-                        channel.basicAck(msg.getMessageProperties().getDeliveryTag(), false);
-                    }
-                    break;
-                default:
-            }
-        } catch (Exception e) {
-            log.error(e.getMessage());
-        }
-    }
-
-    /**
-     * RPC 调用响应
-     * RPC 调用不需要返回 ACK
-     *
-     * @param
-     * @return
-     * @date 2021-02-22 下午5:43
-     */
-    private void rpcReply(MessageProperties messageProperties, RpcResult result) {
-        log.info("RPC 调用完成,返回结果...");
-        String replyTo = messageProperties.getReplyTo();
-        String correlationId = messageProperties.getCorrelationId();
-        rabbitTemplate.convertAndSend(replyTo, JsonConverter.objectToJson(result), message -> {
-            message.getMessageProperties().setCorrelationId(correlationId);
-            return message;
-        });
-    }
-}

+ 0 - 59
dagent/src/main/java/cn/reghao/autodop/dagent/utils/amqp/DagentConsumerConfig.java

@@ -1,59 +0,0 @@
-package cn.reghao.autodop.dagent.utils.amqp;
-
-import cn.reghao.autodop.common.dagent.machine.Machine;
-import org.springframework.amqp.core.Binding;
-import org.springframework.amqp.core.BindingBuilder;
-import org.springframework.amqp.core.DirectExchange;
-import org.springframework.amqp.core.Queue;
-import org.springframework.amqp.rabbit.connection.ConnectionFactory;
-import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * dagent 队列消费者
- *
- * @author reghao
- * @date 2020-09-04 10:57:56
- */
-//@Configuration
-public class DagentConsumerConfig {
-    private DagentConsumer dagentConsumer;
-
-    public DagentConsumerConfig(DagentConsumer dagentConsumer) {
-        this.dagentConsumer = dagentConsumer;
-    }
-
-    @Bean
-    DirectExchange directExchange(){
-        return new DirectExchange("amq.direct", true, false);
-    }
-
-    @Bean
-    public Queue dagentQueue() {
-        String queueName = "dagent@" + Machine.machineId();
-        Map<String, Object> args = new HashMap<>();
-        // 队列 TTL
-        args.put("x-message-ttl", 6000);
-        // 创建一个队列
-        return new Queue(queueName, true, false, false, args);
-    }
-
-    @Bean
-    Binding bindingQueueToExchange(Queue queue, DirectExchange directExchange) {
-        String routeKey = Machine.machineId();
-        return BindingBuilder.bind(queue).to(directExchange).with(routeKey);
-    }
-
-    @Bean
-    public SimpleMessageListenerContainer messageListenerContainer(ConnectionFactory connectionFactory) {
-        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
-        container.setConnectionFactory(connectionFactory);
-        container.addQueues(dagentQueue());
-        container.setMessageListener(dagentConsumer);
-        return container;
-    }
-}

+ 1 - 32
dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggerConfig.java

@@ -9,8 +9,6 @@ import ch.qos.logback.core.Appender;
 import ch.qos.logback.core.ConsoleAppender;
 import ch.qos.logback.core.FileAppender;
 import org.slf4j.LoggerFactory;
-import org.springframework.amqp.rabbit.logback.AmqpAppender;
-import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
 
 /**
  * 编程方式配置 logback,相当于 logback-spring-bak.xml 配置
@@ -21,42 +19,13 @@ import org.springframework.boot.autoconfigure.amqp.RabbitProperties;
 public class LoggerConfig {
     private LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
 
-    public void initialize(RabbitProperties rabbitProperties) {
+    public void initialize() {
         Logger rootLogger = loggerContext.getLogger("ROOT");
         rootLogger.setAdditive(false);
         rootLogger.setLevel(Level.INFO);
         //rootLogger.addAppender(amqpAppender(rabbitProperties));
     }
 
-    private Appender<ILoggingEvent> amqpAppender(RabbitProperties rabbitProperties) {
-        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
-        //layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
-        layoutEncoder.setPattern("%d{yyyy-MM-dd HH:mm:ss} [%c] [%p] %m%n");
-        layoutEncoder.setContext(loggerContext);
-        layoutEncoder.start();
-
-        AmqpAppender amqpAppender = new AmqpAppender();
-        amqpAppender.setName("mqAppender");
-        amqpAppender.setHost(rabbitProperties.getHost());
-        amqpAppender.setPort(rabbitProperties.getPort());
-        amqpAppender.setUsername(rabbitProperties.getUsername());
-        amqpAppender.setPassword(rabbitProperties.getPassword());
-        amqpAppender.setDeclareExchange(true);
-        amqpAppender.setExchangeType("direct");
-        amqpAppender.setExchangeName("rabbit.dagent.log");
-        amqpAppender.setRoutingKeyPattern("info");
-        amqpAppender.setDurable(true);
-        amqpAppender.setAutoDelete(false);
-        amqpAppender.setDeliveryMode("NON_PERSISTENT");
-        amqpAppender.setGenerateId(true);
-        amqpAppender.setCharset("UTF-8");
-        amqpAppender.setContext(loggerContext);
-        amqpAppender.setEncoder(layoutEncoder);
-        amqpAppender.start();
-
-        return amqpAppender;
-    }
-
     private Appender<ILoggingEvent> fileAppender() {
         PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
         layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");

+ 0 - 87
dagent/src/main/java/cn/reghao/autodop/dagent/utils/log/LoggingInitializer.java

@@ -1,87 +0,0 @@
-package cn.reghao.autodop.dagent.utils.log;
-
-import ch.qos.logback.classic.Level;
-import ch.qos.logback.classic.Logger;
-import ch.qos.logback.classic.LoggerContext;
-import ch.qos.logback.classic.encoder.PatternLayoutEncoder;
-import ch.qos.logback.classic.spi.ILoggingEvent;
-import ch.qos.logback.core.Appender;
-import ch.qos.logback.core.ConsoleAppender;
-import ch.qos.logback.core.FileAppender;
-import org.slf4j.LoggerFactory;
-import org.springframework.amqp.rabbit.logback.AmqpAppender;
-import org.springframework.context.ApplicationContextInitializer;
-import org.springframework.context.ConfigurableApplicationContext;
-import org.springframework.core.annotation.Order;
-
-/**
- * 编程方式配置 logback,相当于 logback-spring-bak.xml 配置
- *
- * @author reghao
- * @date 2021-02-23 23:53:35
- */
-@Order
-public class LoggingInitializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
-    @Override
-    public void initialize(ConfigurableApplicationContext applicationContext) {
-        LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
-        Logger rootLogger = loggerContext.getLogger("ROOT");
-        rootLogger.setAdditive(false);
-        rootLogger.setLevel(Level.INFO);
-        //rootLogger.addAppender(amqpAppender(loggerContext));
-    }
-
-    private Appender<ILoggingEvent> amqpAppender(LoggerContext loggerContext) {
-        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
-        layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
-        layoutEncoder.setContext(loggerContext);
-        layoutEncoder.start();
-
-        AmqpAppender amqpAppender = new AmqpAppender();
-        amqpAppender.setName("mqAppender");
-        amqpAppender.setAddresses("localhost:5672");
-        amqpAppender.setUsername("dev");
-        amqpAppender.setPassword("Dev@12345678");
-        amqpAppender.setDeclareExchange(true);
-        amqpAppender.setExchangeType("direct");
-        amqpAppender.setExchangeName("rabbit.dagent.log");
-        amqpAppender.setRoutingKeyPattern("info");
-        amqpAppender.setDurable(true);
-        amqpAppender.setAutoDelete(false);
-        amqpAppender.setDeliveryMode("NON_PERSISTENT");
-        amqpAppender.setGenerateId(true);
-        amqpAppender.setCharset("UTF-8");
-        amqpAppender.setContext(loggerContext);
-        amqpAppender.setEncoder(layoutEncoder);
-        amqpAppender.start();
-
-        return amqpAppender;
-    }
-
-    private Appender<ILoggingEvent> fileAppender(LoggerContext loggerContext) {
-        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
-        layoutEncoder.setPattern("%date %level [%thread] %logger{10} [%file:%line] %msg%n");
-        layoutEncoder.setContext(loggerContext);
-        layoutEncoder.start();
-
-        FileAppender<ILoggingEvent> fileAppender = new FileAppender<ILoggingEvent>();
-        fileAppender.setFile("file");
-        fileAppender.setEncoder(layoutEncoder);
-        fileAppender.setContext(loggerContext);
-        fileAppender.start();
-
-        return fileAppender;
-    }
-
-    private Appender<ILoggingEvent> consoleAppender(LoggerContext loggerContext) {
-        PatternLayoutEncoder layoutEncoder = new PatternLayoutEncoder();
-        layoutEncoder.setPattern("%d{HH:mm:ss.SSS} [%thread] %-5level %c %M %L - %msg%n");
-        layoutEncoder.setContext(loggerContext);
-        layoutEncoder.start();
-
-        ConsoleAppender<ILoggingEvent> consoleAppender = new ConsoleAppender<>();
-        consoleAppender.start();
-
-        return consoleAppender;
-    }
-}

+ 1 - 19
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/AppStatusService.java

@@ -2,7 +2,6 @@ package cn.reghao.autodop.dmaster.app.service;
 
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.message.AppOps;
 import cn.reghao.autodop.common.dagent.app.api.data.AppIdArgs;
 import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
@@ -11,7 +10,6 @@ import cn.reghao.autodop.dmaster.app.cache.BuildDeployCache;
 import cn.reghao.autodop.dmaster.app.entity.config.AppOrchestration;
 import cn.reghao.autodop.dmaster.app.entity.config.deploy.DeployConfig;
 import cn.reghao.autodop.common.dagent.app.api.data.log.LogFile;
-//import cn.reghao.autodop.dmaster.app.db.log.AppStatusCrudService;
 import cn.reghao.autodop.dmaster.common.db.PageList;
 import org.springframework.stereotype.Service;
 
@@ -28,14 +26,10 @@ import java.util.Map;
  */
 @Service
 public class AppStatusService {
-    private RemoteCallService callService;
     private BuildDeployCache cache;
-    //private AppStatusCrudService statusCrudService;
 
-    public AppStatusService(RemoteCallService callService, BuildDeployCache cache/*, AppStatusCrudService crudService*/) {
-        this.callService = callService;
+    public AppStatusService(BuildDeployCache cache) {
         this.cache = cache;
-        //this.statusCrudService = crudService;
     }
 
     /**
@@ -232,18 +226,6 @@ public class AppStatusService {
         }
 
         Map<String, AppStatus> map = new HashMap<>();
-        Map<String, RpcResult> rpcResultMap = callService.call(asyncMsgs);
-        for (Map.Entry<String, RpcResult> entry : rpcResultMap.entrySet()) {
-            String machineId = entry.getKey();
-            RpcResult rpcResult = entry.getValue();
-            if (rpcResult.getStatusCode() == 0) {
-                AppStatus appStatus =
-                        JsonConverter.jsonToObject(rpcResult.getResult(), AppStatus.class);
-                appStatus.setEnv(app.getEnv());
-                map.put(machineId, appStatus);
-            }
-        }
-
         // TODO 持久化到数据库
         return map;
     }

+ 0 - 131
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/RemoteCallService.java

@@ -1,131 +0,0 @@
-package cn.reghao.autodop.dmaster.app.service;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.amqp.RabbitProducer;
-import cn.reghao.autodop.common.amqp.RpcResult;
-import cn.reghao.autodop.common.utils.ExceptionUtil;
-import cn.reghao.autodop.dmaster.machine.entity.MachineInfo;
-import cn.reghao.autodop.dmaster.machine.entity.NetworkInfo;
-import cn.reghao.autodop.dmaster.machine.repository.MachineInfoRepository;
-import cn.reghao.autodop.dmaster.common.thread.ThreadPoolWrapper;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Service;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-
-/**
- * 远程调用服务
- *
- * @author reghao
- * @date 2021-02-10 02:31:12
- */
-@Slf4j
-@Service
-public class RemoteCallService {
-    private ExecutorService threadPool = ThreadPoolWrapper.threadPool("remote-call-service");
-    private final String exchange = "amq.direct";
-    private RabbitProducer rabbitProducer;
-    private MachineInfoRepository machineInfoRepository;
-
-    public RemoteCallService(RabbitProducer rabbitProducer, MachineInfoRepository machineInfoRepository) {
-        this.rabbitProducer = rabbitProducer;
-        this.machineInfoRepository = machineInfoRepository;
-    }
-
-    /**
-     * 单个调用
-     *
-     * @param
-     * @return
-     * @date 2021-03-05 下午7:54
-     */
-    public RpcResult call(AsyncMsg asyncMsg) {
-        Future<RpcResult> future = threadPool.submit(new RemoteCallTask(asyncMsg));
-        while (!future.isDone() && !future.isCancelled()) {
-            // 休眠等待异步任务结束
-            try {
-                Thread.sleep(1_000);
-            } catch (InterruptedException e) {
-                e.printStackTrace();
-            }
-        }
-
-        String machineId = asyncMsg.getMachineId();
-        try {
-            RpcResult rpcResult = future.get();
-            if (rpcResult == null) {
-                return RpcResult.fail(machineIpv4(machineId) + " RPC 调用失败");
-            }
-            return rpcResult;
-        } catch (InterruptedException | ExecutionException e) {
-            return RpcResult.error(machineIpv4(machineId) + " " + ExceptionUtil.errorMsg(e));
-        }
-    }
-
-    /**
-     * 多个调用
-     *
-     * @param
-     * @return
-     * @date 2021-03-05 下午7:54
-     */
-    public Map<String, RpcResult> call(List<AsyncMsg> asyncMsgs) {
-        Map<String, Future<RpcResult>> futureMap = new HashMap<>(asyncMsgs.size());
-        asyncMsgs.forEach(mqMessage ->
-                futureMap.put(mqMessage.getMachineId(), threadPool.submit(new RemoteCallTask(mqMessage))));
-
-        Map<String, RpcResult> resultMap = new HashMap<>(asyncMsgs.size());
-        for (Map.Entry<String, Future<RpcResult>> entry : futureMap.entrySet()) {
-            Future<RpcResult> future = entry.getValue();
-            while (!future.isDone() && !future.isCancelled()) {
-                // TODO 休眠等待异步任务结束
-                try {
-                    Thread.sleep(1_000);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-            }
-
-            String machineId = entry.getKey();
-            log.info(machineIpv4(machineId) + " -> RPC 调用返回");
-            try {
-                RpcResult rpcResult = future.get();
-                if (rpcResult != null) {
-                    resultMap.put(machineId, rpcResult);
-                } else {
-                    resultMap.put(machineId, RpcResult.fail(machineIpv4(machineId) + " RPC 调用失败"));
-                }
-            } catch (InterruptedException | ExecutionException e) {
-                resultMap.put(machineId,
-                        RpcResult.error(machineIpv4(machineId) + " " + ExceptionUtil.errorMsg(e)));
-            }
-        }
-        return resultMap;
-    }
-
-    private String machineIpv4(String machineId) {
-        MachineInfo machineInfo = machineInfoRepository.findByMachineId(machineId);
-        List<NetworkInfo> networkInfos = machineInfo.getNetworkInfo();
-        return networkInfos.get(0).getIpv4();
-    }
-
-    class RemoteCallTask implements Callable<RpcResult> {
-        private AsyncMsg asyncMsg;
-
-        public RemoteCallTask(AsyncMsg asyncMsg) {
-            this.asyncMsg = asyncMsg;
-        }
-
-        @Override
-        public RpcResult call() {
-            log.info("RPC 调用...");
-            return rabbitProducer.callRemote(exchange, asyncMsg.getMachineId(), asyncMsg);
-        }
-    }
-}

+ 1 - 35
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -2,19 +2,15 @@ package cn.reghao.autodop.dmaster.app.service.bd;
 
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.amqp.RpcResult;
 import cn.reghao.autodop.common.message.AppOps;
-import cn.reghao.autodop.common.dagent.app.api.data.AppStatus;
 import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
 import cn.reghao.autodop.common.docker.pojo.Config;
 import cn.reghao.autodop.common.mqtt.MqttPub;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.cache.BuildDeployCache;
-import cn.reghao.autodop.dmaster.app.constant.BuildDeployResult;
 import cn.reghao.autodop.dmaster.app.entity.config.deploy.DeployConfig;
 import cn.reghao.autodop.dmaster.app.entity.log.BuildLog;
 import cn.reghao.autodop.dmaster.app.entity.log.DeployLog;
-import cn.reghao.autodop.dmaster.app.service.RemoteCallService;
 import cn.reghao.autodop.dmaster.app.service.log.BuildDeployLogConsumer;
 import cn.reghao.autodop.dmaster.machine.entity.MachineInfo;
 import cn.reghao.autodop.dmaster.machine.entity.NetworkInfo;
@@ -37,18 +33,15 @@ import java.util.*;
 public class AppDeployer {
     private MqttPub mqttPub;
     private BuildDeployCache cache;
-    private RemoteCallService callService;
     private BuildDeployLogConsumer logConsumer;
     private MachineInfoRepository machineInfoRepository;
 
     public AppDeployer(MqttPub mqttPub,
                        BuildDeployCache cache,
-                       RemoteCallService callService,
                        BuildDeployLogConsumer logConsumer,
                        MachineInfoRepository machineInfoRepository) {
         this.mqttPub = mqttPub;
         this.cache = cache;
-        this.callService = callService;
         this.logConsumer = logConsumer;
         this.machineInfoRepository = machineInfoRepository;
     }
@@ -107,34 +100,7 @@ public class AppDeployer {
             asyncMsgs.add(asyncMsg);
         }
 
-        Map<String, RpcResult> rpcResultMap = callService.call(asyncMsgs);
-        LocalDateTime deployTime = LocalDateTime.now();
-        List<DeployLog> deployLogs = new ArrayList<>();
-        rpcResultMap.forEach((machineId, rpcResult) -> {
-            DeployLog deployLog = DeployLog.from(buildLog);
-            deployLog.setMachineId(machineId);
-            String machineIpv4 = machineIpv4(machineId);
-            deployLog.setMachineIpv4(machineIpv4);
-            deployLog.setDeployTime(deployTime);
-
-            deployLog.setStatusCode(rpcResult.getStatusCode());
-            if (rpcResult.getStatusCode() == 0) {
-                AppStatus appStatus =
-                        JsonConverter.jsonToObject(rpcResult.getResult(), AppStatus.class);
-                appStatus.setAppType(buildLog.getAppType());
-                appStatus.setEnv(buildLog.getEnv());
-                appStatus.setCommitId(buildLog.getCommitInfo().getCommitId());
-                appStatus.setMachineIpv4(machineIpv4);
-
-                logConsumer.addAppStatus(appStatus);
-                deployLog.setResult(BuildDeployResult.deployDone.getName());
-            } else {
-                deployLog.setResult(rpcResult.getResult());
-            }
-            deployLogs.add(deployLog);
-        });
-
-        return deployLogs;
+        return new ArrayList<>();
     }
 
     private String machineIpv4(String machineId) {

+ 2 - 0
dmaster/src/main/java/cn/reghao/autodop/dmaster/machine/entity/MachineLog.java

@@ -6,6 +6,8 @@ import lombok.EqualsAndHashCode;
 import org.springframework.data.mongodb.core.mapping.Document;
 
 /**
+ * dagent 日志
+ *
  * @author reghao
  * @date 2020-10-22 15:45:29
  */

+ 0 - 67
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/amqp/DmasterConsumer.java

@@ -1,67 +0,0 @@
-package cn.reghao.autodop.dmaster.utils.amqp;
-
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import cn.reghao.autodop.dmaster.utils.dispatcher.MachineOpsDispatcher;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.amqp.rabbit.annotation.Exchange;
-import org.springframework.amqp.rabbit.annotation.Queue;
-import org.springframework.amqp.rabbit.annotation.QueueBinding;
-import org.springframework.amqp.rabbit.annotation.RabbitListener;
-import org.springframework.messaging.handler.annotation.Payload;
-import org.springframework.stereotype.Component;
-
-/**
- * 消息分发
- *
- * @author reghao
- * @date 2020-09-04 11:00:22
- */
-@Slf4j
-@Component
-public class DmasterConsumer {
-    private MachineOpsDispatcher machineOpsDispatcher;
-
-    public DmasterConsumer(MachineOpsDispatcher machineOpsDispatcher) {
-        this.machineOpsDispatcher = machineOpsDispatcher;
-    }
-
-    // 将 dmaster 队列通过 routeKey dmaster 绑定到 amq.direct 交换机
-    @RabbitListener(bindings = @QueueBinding(
-            value = @Queue(value = "dmaster", durable = "true"),
-            key = "dmaster",
-            exchange = @Exchange(value = "amq.direct"))
-    )
-    public void dmasterQueueConsumer(@Payload String msg) {
-        try {
-            AsyncMsg asyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String machineId = asyncMsg.getMachineId();
-            long sendTime = asyncMsg.getSendTime();
-            //boolean isRpc = asyncMsg.isRpc();
-            String type = asyncMsg.getType();
-            String ops = asyncMsg.getOps();
-            String payload = asyncMsg.getPayload();
-
-            switch (MessageType.valueOf(type)) {
-                case machineType:
-                    machineOpsDispatcher.dispatch(ops, payload);
-                    break;
-                case appType:
-                    log.info("msg from app...");
-                    break;
-                default:
-            }
-        } catch (Exception e) {
-            log.error(e.getMessage());
-        }
-    }
-
-    @RabbitListener(bindings = @QueueBinding(
-            value = @Queue(value = "dagent.log", durable = "true"),
-            exchange = @Exchange(value = "rabbit.dagent.log"))
-    )
-    public void logQueueConsumer(@Payload String msg) {
-        //log.info("日志消息 -> {}", msg);
-    }
-}

+ 6 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/AppOpsDispatcher.java

@@ -5,9 +5,12 @@ import cn.reghao.autodop.common.message.AppOps;
 import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.result.ResultCode;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
+import com.google.gson.reflect.TypeToken;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.lang.reflect.Type;
+
 /**
  * 分发对 App 的操作
  *
@@ -17,15 +20,13 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class AppOpsDispatcher {
-    public AppOpsDispatcher() {
-    }
-
     public void dispatch(String ops, String payload) {
         switch (AppOps.valueOf(ops)) {
             case appDeployResult:
-                CallResult callResult = JsonConverter.jsonToObject(payload, CallResult.class);
+                Type type = new TypeToken<CallResult<AppStatus>>(){}.getType();
+                CallResult<AppStatus> callResult = JsonConverter.jsonToObject(payload, type);
                 if (callResult.getCode() == ResultCode.SUCCESS.getCode()) {
-                    AppStatus appStatus = (AppStatus) callResult.getData();
+                    AppStatus appStatus = callResult.getData();
                 }
                 log.info("部署应用...");
                 break;

+ 1 - 5
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/DagentMessageDispatcher.java

@@ -19,8 +19,7 @@ public class DagentMessageDispatcher implements MqttCallback {
     private MachineOpsDispatcher machineOpsDispatcher;
     private AppOpsDispatcher appOpsDispatcher;
 
-    public DagentMessageDispatcher(MachineOpsDispatcher machineOpsDispatcher,
-                                   AppOpsDispatcher appOpsDispatcher) {
+    public DagentMessageDispatcher(MachineOpsDispatcher machineOpsDispatcher, AppOpsDispatcher appOpsDispatcher) {
         this.machineOpsDispatcher = machineOpsDispatcher;
         this.appOpsDispatcher = appOpsDispatcher;
     }
@@ -36,8 +35,6 @@ public class DagentMessageDispatcher implements MqttCallback {
         String msg = message.toString();
         try {
             AsyncMsg mqAsyncMsg = JsonConverter.jsonToObject(msg, AsyncMsg.class);
-            String machineId = mqAsyncMsg.getMachineId();
-            long sendTime = mqAsyncMsg.getSendTime();
             String type = mqAsyncMsg.getType();
             String ops = mqAsyncMsg.getOps();
             String payload = mqAsyncMsg.getPayload();
@@ -48,7 +45,6 @@ public class DagentMessageDispatcher implements MqttCallback {
                     break;
                 case appType:
                     appOpsDispatcher.dispatch(ops, payload);
-                    log.info("msg from app -> {}", payload);
                     break;
                 default:
             }

+ 7 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/utils/dispatcher/MachineOpsDispatcher.java

@@ -52,8 +52,14 @@ public class MachineOpsDispatcher {
                     MachineLog machineLog = JsonConverter.jsonToObject(payload, MachineLog.class);
                     //machineLogCrudService.insert(machineLog);
                     break;
+                case machineShellResult:
+                    log.info("机器 shell 结果消息...");
+                    break;
+                case machineStateResult:
+                    log.info("机器状态消息...");
+                    break;
                 default:
-                    //log.info("来自机器的消息...");
+                    log.info("来自机器的消息...");
             }
         } catch (Exception e) {
             e.printStackTrace();

+ 0 - 71
dmaster/src/test/java/cn/reghao/autodop/common/amqp/RabbitProducerTest.java

@@ -1,71 +0,0 @@
-package cn.reghao.autodop.common.amqp;
-
-import cn.reghao.autodop.common.dagent.app.api.data.deploy.AppDeployArgs;
-import cn.reghao.autodop.common.message.AsyncMsg;
-import cn.reghao.autodop.common.message.MachineOps;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineShell;
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineShellArgs;
-import cn.reghao.autodop.common.message.MessageType;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import cn.reghao.autodop.dmaster.DmasterApplication;
-import lombok.extern.slf4j.Slf4j;
-import org.junit.jupiter.api.Test;
-
-import org.junit.runner.RunWith;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.test.context.ActiveProfiles;
-import org.springframework.test.context.junit4.SpringRunner;
-
-@Slf4j
-@ActiveProfiles("dev")
-@SpringBootTest(classes = DmasterApplication.class)
-@RunWith(SpringRunner.class)
-class RabbitProducerTest {
-    @Autowired
-    private RabbitProducer rabbitProducer;
-
-    @Test
-    void send() throws InterruptedException {
-        String exchange = "amq.direct";
-        String routeKey = "dagent";
-        AsyncMsg asyncMsg = new AsyncMsg();
-        asyncMsg.setMachineId("5d1a727991f34d3a9c1220a1899e6ebd");
-        asyncMsg.setType(MessageType.machineType.name());
-        asyncMsg.setSendTime(System.currentTimeMillis());
-        asyncMsg.setPayload("test test");
-
-        while (true) {
-            Thread.sleep(1000);
-            rabbitProducer.send(exchange, routeKey, asyncMsg);
-            log.info("发送消息...");
-        }
-    }
-
-    @Test
-    void callRemote() {
-        MachineShellArgs machineShellArgs = new MachineShellArgs();
-        machineShellArgs.setType("command");
-        machineShellArgs.setContent("cat /etc/nginx/nginx.conf");
-
-        AppDeployArgs appDeployArgs = new AppDeployArgs();
-        /*appDeployArgs.setPackerType(PackerType.docker.name());
-        appDeployArgs.setAppId("admin-sys");
-        appDeployArgs.setAppPath("docker.alpha.iquizoo.com/iq3x/dnkt:56d47672");
-        appDeployArgs.setRunningDir("");*/
-
-        String exchange = "amq.direct";
-        String routeKey = "dagent";
-        String machineId = "5d1a727991f34d3a9c1220a1899e6ebd";
-        AsyncMsg asyncMsg = new AsyncMsg();
-        asyncMsg.setMachineId(machineId);
-        asyncMsg.setSendTime(System.currentTimeMillis());
-        asyncMsg.setType(MessageType.machineType.name());
-        asyncMsg.setOps(MachineOps.machineShell.name());
-        asyncMsg.setPayload(JsonConverter.objectToJson(machineShellArgs));
-
-        RpcResult rpcResult = rabbitProducer.callRemote(exchange, routeKey, asyncMsg);
-        MachineShell machineShell = JsonConverter.jsonToObject(rpcResult.getResult(), MachineShell.class);
-        System.out.println(rpcResult.getResult());
-    }
-}

+ 0 - 23
pom.xml

@@ -51,29 +51,6 @@
             <version>1.2.0</version>
         </dependency>
 
-        <!--<dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-integration</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.integration</groupId>
-            <artifactId>spring-integration-stream</artifactId>
-        </dependency>
-        <dependency>
-            <groupId>org.springframework.integration</groupId>
-            <artifactId>spring-integration-mqtt</artifactId>
-        </dependency>-->
-
-        <dependency>
-            <groupId>org.springframework.boot</groupId>
-            <artifactId>spring-boot-starter-amqp</artifactId>
-        </dependency>
-
-        <dependency>
-            <groupId>org.springframework.amqp</groupId>
-            <artifactId>spring-rabbit</artifactId>
-        </dependency>
-
         <dependency>
             <groupId>org.springframework.boot</groupId>
             <artifactId>spring-boot-configuration-processor</artifactId>