Przeglądaj źródła

dmaster 和 dagent 之间的通信

reghao 4 lat temu
rodzic
commit
af86d47aac

+ 3 - 3
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcMsg.java

@@ -12,14 +12,14 @@ import java.util.UUID;
 public class RpcMsg {
     private String msgId;
     private RpcPayload rpcPayload;
-    private RpcResult<?> rpcResult;
+    private RpcResult rpcResult;
 
     public RpcMsg(RpcPayload rpcPayload) {
         this.msgId = UUID.randomUUID().toString();
         this.rpcPayload = rpcPayload;
     }
 
-    public RpcMsg(String msgId, RpcPayload rpcPayload, RpcResult<?> rpcResult) {
+    public RpcMsg(String msgId, RpcPayload rpcPayload, RpcResult rpcResult) {
         this.msgId = msgId;
         this.rpcPayload = rpcPayload;
         this.rpcResult = rpcResult;
@@ -29,7 +29,7 @@ public class RpcMsg {
         return new RpcMsg(rpcPayload);
     }
 
-    public static RpcMsg resultMsg(RpcMsg callMsg, RpcResult<?> rpcResult) {
+    public static RpcMsg resultMsg(RpcMsg callMsg, RpcResult rpcResult) {
         return new RpcMsg(callMsg.getMsgId(), callMsg.getRpcPayload(), rpcResult);
     }
 }

+ 3 - 3
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcPayload.java

@@ -14,7 +14,7 @@ public class RpcPayload implements Serializable {
 
     private String clazz;
     private String method;
-    private Object param;
+    private String jsonParam;
 
     public static RpcPayload rpcPayload(String clazz, String method) {
         RpcPayload nodeMsg = new RpcPayload();
@@ -23,11 +23,11 @@ public class RpcPayload implements Serializable {
         return nodeMsg;
     }
 
-    public static RpcPayload rpcPayload(String clazz, String method, Object param) {
+    public static RpcPayload rpcPayload(String clazz, String method, String jsonParam) {
         RpcPayload nodeMsg = new RpcPayload();
         nodeMsg.setClazz(clazz);
         nodeMsg.setMethod(method);
-        nodeMsg.setParam(param);
+        nodeMsg.setJsonParam(jsonParam);
         return nodeMsg;
     }
 }

+ 8 - 8
common/src/main/java/cn/reghao/autodop/common/msg/rpc/RpcResult.java

@@ -13,21 +13,21 @@ import static cn.reghao.jdkutil.result.ResultStatus.*;
  */
 @Data
 @AllArgsConstructor
-public class RpcResult<T> {
+public class RpcResult {
     // 0 - 成功 1 - 失败
     private int code;
     private String msg;
-    private T data;
+    private String jsonData;
 
-    public static <T> RpcResult<T> success(T data) {
-        return new RpcResult<>(SUCCESS.getCode(), SUCCESS.getMsg(), data);
+    public static  RpcResult success(String jsonData) {
+        return new RpcResult(SUCCESS.getCode(), SUCCESS.getMsg(), jsonData);
     }
 
-    public static <T> RpcResult<T> fail(String msg) {
-        return new RpcResult<>(FAIL.getCode(), msg, null);
+    public static  RpcResult fail(String msg) {
+        return new RpcResult(FAIL.getCode(), msg, null);
     }
 
-    public static <T> RpcResult<T> error(String msg) {
-        return new RpcResult<>(ERROR.getCode(), msg, null);
+    public static  RpcResult error(String msg) {
+        return new RpcResult(ERROR.getCode(), msg, null);
     }
 }

+ 1 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/RpcListener.java

@@ -55,7 +55,7 @@ public class RpcListener implements MqttCallback {
 
             RpcPayload rpcPayload = callMsg.getRpcPayload();
             String clazz = rpcPayload.getClazz();
-            RpcResult<?> rpcResult;
+            RpcResult rpcResult;
             switch (RpcClazz.valueOf(clazz)) {
                 case AppRpcClazz:
                     rpcResult = appClazz.process(rpcPayload);

+ 24 - 32
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/AppClazzRpcImpl.java

@@ -14,6 +14,7 @@ import cn.reghao.autodop.dagent.app.ZipAppServiceImpl;
 import cn.reghao.jdkutil.MachineId;
 import cn.reghao.jdkutil.result.Result;
 import cn.reghao.jdkutil.result.ResultStatus;
+import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
@@ -34,34 +35,25 @@ public class AppClazzRpcImpl {
         this.zipAppServiceImpl = new ZipAppServiceImpl();
     }
     
-    public RpcResult<?> process(RpcPayload rpcPayload) {
+    public RpcResult process(RpcPayload rpcPayload) {
         String method = rpcPayload.getMethod();
-        Object param = rpcPayload.getParam();
+        String jsonParam = rpcPayload.getJsonParam();
+        Object param;
         switch (AppRpcClazz.valueOf(method)) {
             case deploy:
-                if (!(param instanceof DeployParam)) {
-                    return RpcResult.error("不是 DeployParam 对象");
-                }
+                param = JsonConverter.jsonToObject(jsonParam, DeployParam.class);
                 return deploy(param);
             case status:
-                if (!(param instanceof AppIdParam)) {
-                    return RpcResult.error("不是 AppIdParam 对象");
-                }
+                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
                 return status(param);
             case start:
-                if (!(param instanceof AppIdParam)) {
-                    return RpcResult.error("不是 AppIdParam 对象");
-                }
+                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
                 return start(param);
             case stop:
-                if (!(param instanceof AppIdParam)) {
-                    return RpcResult.error("不是 AppIdParam 对象");
-                }
+                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
                 return stop(param);
             case restart:
-                if (!(param instanceof AppIdParam)) {
-                    return RpcResult.error("不是 AppIdParam 对象");
-                }
+                param = JsonConverter.jsonToObject(jsonParam, AppIdParam.class);
                 return restart(param);
             default:
                 String err = String.format("找不到 %s 方法", method);
@@ -69,7 +61,7 @@ public class AppClazzRpcImpl {
         }
     }
     
-    private RpcResult<?> deploy(Object param) {
+    private RpcResult deploy(Object param) {
         DeployParam deployParam = (DeployParam) param;
         String buildLogId = deployParam.getBuildLogId();
         String packType = deployParam.getPackType();
@@ -96,10 +88,10 @@ public class AppClazzRpcImpl {
             deployResult.setResult(Result.result(ResultStatus.FAIL, ExceptionUtil.errorMsg(e)));
         }
         
-        return RpcResult.success(deployResult);
+        return RpcResult.success(JsonConverter.objectToJson(deployResult));
     }
 
-    public RpcResult<?> status(Object param) {
+    public RpcResult status(Object param) {
         AppIdParam appIdParam = (AppIdParam) param;
         String appId = appIdParam.getAppId();
         String packerType = appIdParam.getPackerType();
@@ -120,10 +112,10 @@ public class AppClazzRpcImpl {
             return RpcResult.fail(ExceptionUtil.errorMsg(e));
         }
         
-        return RpcResult.success(appStatus);
+        return RpcResult.success(JsonConverter.objectToJson(appStatus));
     }
 
-    public RpcResult<AppStatus> restart(Object param) {
+    public RpcResult restart(Object param) {
         AppIdParam appIdParam = (AppIdParam) param;
         String appId = appIdParam.getAppId();
         String packerType = appIdParam.getPackerType();
@@ -144,10 +136,10 @@ public class AppClazzRpcImpl {
             return RpcResult.fail(ExceptionUtil.errorMsg(e));
         }
         
-        return RpcResult.success(appStatus);
+        return RpcResult.success(JsonConverter.objectToJson(appStatus));
     }
 
-    public RpcResult<AppStatus> stop(Object param) {
+    public RpcResult stop(Object param) {
         AppIdParam appIdParam = (AppIdParam) param;
         String appId = appIdParam.getAppId();
         String packerType = appIdParam.getPackerType();
@@ -168,10 +160,10 @@ public class AppClazzRpcImpl {
             return RpcResult.fail(ExceptionUtil.errorMsg(e));
         }
 
-        return RpcResult.success(appStatus);
+        return RpcResult.success(JsonConverter.objectToJson(appStatus));
     }
 
-    public RpcResult<AppStatus> start(Object param) {
+    public RpcResult start(Object param) {
         AppIdParam appIdParam = (AppIdParam) param;
         String appId = appIdParam.getAppId();
         String packerType = appIdParam.getPackerType();
@@ -192,10 +184,10 @@ public class AppClazzRpcImpl {
             return RpcResult.fail(ExceptionUtil.errorMsg(e));
         }
 
-        return RpcResult.success(appStatus);
+        return RpcResult.success(JsonConverter.objectToJson(appStatus));
     }
 
-    public RpcResult<?> log(Object param) {
+    public RpcResult log(Object param) {
         AppLogArgs appLogArgs = (AppLogArgs) param;
         String packerType = appLogArgs.getPackerType();
         long count = appLogArgs.getLogConfigs().stream().filter(LogConfig::getIsDir).count();
@@ -204,18 +196,18 @@ public class AppClazzRpcImpl {
                 case docker:
                     if (count > 0) {
                         List<LogFile> logFiles = dockerAppServiceImpl.logFiles(appLogArgs);
-                        return RpcResult.success(logFiles);
+                        return RpcResult.success(JsonConverter.objectToJson(logFiles));
                     } else {
                         List<String> logContent = dockerAppServiceImpl.logContent(appLogArgs);
-                        return RpcResult.success(logContent);
+                        return RpcResult.success(JsonConverter.objectToJson(logContent));
                     }
                 case zip:
                     if (count > 0) {
                         List<LogFile> logFiles = zipAppServiceImpl.logFiles(appLogArgs);
-                        return RpcResult.success(logFiles);
+                        return RpcResult.success(JsonConverter.objectToJson(logFiles));
                     } else {
                         List<String> logContent = zipAppServiceImpl.logContent(appLogArgs);
-                        return RpcResult.success(logContent);
+                        return RpcResult.success(JsonConverter.objectToJson(logContent));
                     }
                 default:
                     return RpcResult.error("打包类型 " + appLogArgs.getPackerType() + " 不存在");

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/DockerClazzRpcImpl.java

@@ -20,9 +20,9 @@ public class DockerClazzRpcImpl {
         //this.docker = new Docker();
     }
     
-    public RpcResult<?> process(RpcPayload rpcPayload) {
+    public RpcResult process(RpcPayload rpcPayload) {
         String method = rpcPayload.getMethod();
-        Object param = rpcPayload.getParam();
+        String jsonParam = rpcPayload.getJsonParam();
         switch (DockerRpcClazz.valueOf(method)) {
             case dockerImage:
             default:

+ 7 - 6
dagent/src/main/java/cn/reghao/autodop/dagent/rpc/impl/MachineClazzRpcImpl.java

@@ -6,6 +6,7 @@ import cn.reghao.autodop.common.machine.shell.MachineShell;
 import cn.reghao.autodop.common.msg.rpc.RpcPayload;
 import cn.reghao.autodop.common.msg.rpc.RpcResult;
 import cn.reghao.autodop.common.msg.rpc.clazz.MachineRpcClazz;
+import cn.reghao.jdkutil.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
@@ -22,9 +23,9 @@ public class MachineClazzRpcImpl {
         this.machine = machine;
     }
     
-    public RpcResult<?> process(RpcPayload rpcPayload) {
+    public RpcResult process(RpcPayload rpcPayload) {
         String method = rpcPayload.getMethod();
-        Object param = rpcPayload.getParam();
+        String jsonParam = rpcPayload.getJsonParam();
         switch (MachineRpcClazz.valueOf(method)) {
             case stat:
                 break;
@@ -36,11 +37,11 @@ public class MachineClazzRpcImpl {
     }
 
 
-    public RpcResult<MachineStat> stat() {
-        return RpcResult.success(machine.stat());
+    public RpcResult stat() {
+        return RpcResult.success(JsonConverter.objectToJson(machine.stat()));
     }
 
-    public RpcResult<MachineShell> shell(String script) {
-        return RpcResult.success(null);
+    public RpcResult shell(String script) {
+        return RpcResult.success("");
     }
 }

+ 2 - 2
dmaster/src/main/java/cn/reghao/autodop/dmaster/app/service/bd/AppDeployer.java

@@ -59,8 +59,8 @@ public class AppDeployer {
             asyncMsg.setOps(AppOps.appDeploy.name());
             asyncMsg.setPayload(JsonConverter.objectToJson(deployParam));
 
-            RpcPayload rpcPayload = RpcPayload.rpcPayload(
-                    AppRpcClazz.class.getSimpleName(), AppRpcClazz.deploy.name(), deployParam);
+            RpcPayload rpcPayload = RpcPayload.rpcPayload(AppRpcClazz.class.getSimpleName(), AppRpcClazz.deploy.name(),
+                    JsonConverter.objectToJson(deployParam));
             RpcMsg rpcMsg = RpcMsg.callMsg(rpcPayload);
             Message message = Message.rpcMessage("", "", rpcMsg);
 

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/MessageListener.java

@@ -85,7 +85,7 @@ public class MessageListener implements MqttCallback {
         RpcPayload rpcPayload = rpcMsg.getRpcPayload();
         String clazz = rpcPayload.getClazz();
         String method = rpcPayload.getMethod();
-        RpcResult<?> rpcResult = rpcMsg.getRpcResult();
+        RpcResult rpcResult = rpcMsg.getRpcResult();
         switch (RpcClazz.valueOf(clazz)) {
             case AppRpcClazz:
                 appClazzRpcResult.process(msgId, method, rpcResult);

+ 15 - 39
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/rpcresult/AppClazzRpcResult.java

@@ -4,16 +4,13 @@ import cn.reghao.autodop.common.msg.rpc.RpcResult;
 import cn.reghao.autodop.common.msg.rpc.clazz.AppRpcClazz;
 import cn.reghao.autodop.common.msg.rpc.dto.app.AppStatus;
 import cn.reghao.autodop.common.msg.rpc.dto.app.DeployResult;
-import cn.reghao.autodop.common.message.CallResult;
 import cn.reghao.autodop.common.mqtt.DefaultMqttClient;
+import cn.reghao.jdkutil.result.ResultStatus;
 import cn.reghao.jdkutil.serializer.JsonConverter;
 import cn.reghao.autodop.dmaster.app.service.AppOpsResultService;
-import com.google.gson.reflect.TypeToken;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import java.lang.reflect.Type;
-
 /**
  * 分发 App Result 相关的消息
  *
@@ -31,18 +28,30 @@ public class AppClazzRpcResult {
         this.mqttClient = mqttClient;
     }
 
-    public void process(String msgId, String method, RpcResult<?> rpcResult) {
+    public void process(String msgId, String method, RpcResult rpcResult) {
         mqttClient.removeRecord(msgId);
+        int code = rpcResult.getCode();
+        if (code != ResultStatus.SUCCESS.getCode()) {
+            String msg = rpcResult.getMsg();
+            log.error("error msg -> {}", msg);
+            return;
+        }
+
         switch (AppRpcClazz.valueOf(method)) {
             case deploy:
+                processAppDeployResult(rpcResult.getJsonData());
                 break;
             case start:
+                processAppStatusResult(rpcResult.getJsonData());
                 break;
             case stop:
+                processAppStatusResult(rpcResult.getJsonData());
                 break;
             case restart:
+                processAppStatusResult(rpcResult.getJsonData());
                 break;
             case status:
+                processAppStatusResult(rpcResult.getJsonData());
                 break;
             case log:
                 break;
@@ -57,42 +66,9 @@ public class AppClazzRpcResult {
     }
 
     private void processAppStatusResult(String payload) {
-        AppStatus appStatus = processCallResult(payload);
+        AppStatus appStatus = JsonConverter.jsonToObject(payload, AppStatus.class);
         if (appStatus != null) {
             appOpsResultService.statusResult(appStatus);
         }
     }
-
-    private AppStatus processCallResult(String payload) {
-        Type type = new TypeToken<CallResult<AppStatus>>(){}.getType();
-        CallResult<AppStatus> callResult = JsonConverter.jsonToObject(payload, type);
-        if (callResult.getCode() != 0) {
-            // TODO 处理调用失败的情况
-            log.error("调用失败, 原因: {}", callResult.getMsg());
-            return null;
-        } else {
-            return callResult.getData();
-        }
-    }
-
-    private void processAppRestartResult(String payload) {
-        AppStatus appStatus = processCallResult(payload);
-        if (appStatus != null) {
-            appOpsResultService.restartResult(appStatus);
-        }
-    }
-
-    private void processAppStopResult(String payload) {
-        AppStatus appStatus = processCallResult(payload);
-        if (appStatus != null) {
-            appOpsResultService.stopResult(appStatus);
-        }
-    }
-
-    private void processAppStartResult(String payload) {
-        AppStatus appStatus = processCallResult(payload);
-        if (appStatus != null) {
-            appOpsResultService.startResult(appStatus);
-        }
-    }
 }

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/rpcresult/DockerClazzRpcResult.java

@@ -17,7 +17,7 @@ public class DockerClazzRpcResult {
         this.mqttClient = mqttClient;
     }
 
-    public void process(String msgId, String method, RpcResult<?> rpcResult) {
+    public void process(String msgId, String method, RpcResult rpcResult) {
         mqttClient.removeRecord(msgId);
         switch (DockerRpcClazz.valueOf(method)) {
             case dockerImage:

+ 1 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/rpcresult/MachineClazzRpcResult.java

@@ -19,7 +19,7 @@ public class MachineClazzRpcResult {
         this.mqttClient = mqttClient;
     }
 
-    public void process(String msgId, String method, RpcResult<?> rpcResult) {
+    public void process(String msgId, String method, RpcResult rpcResult) {
         mqttClient.removeRecord(msgId);
         switch (MachineRpcClazz.valueOf(method)) {
             case stat: