reghao 4 rokov pred
rodič
commit
19cb7e61c0

+ 0 - 8
common/src/main/java/cn/reghao/autodop/common/dagent/machine/hardware/disk/Disk.java

@@ -89,12 +89,4 @@ public class Disk {
             return candidates.get(0);
         }
     }
-
-    public static void main(String[] args) {
-        Disk disk = new Disk();
-        List<DiskInfo> diskInfos = disk.info();
-        String home = System.getProperty("user.home");
-        DiskInfo diskInfo = disk.diskInfo("/var/log/nginx");
-        System.out.println();
-    }
 }

+ 10 - 13
common/src/main/java/cn/reghao/autodop/common/dagent/machine/hardware/memory/Memory.java

@@ -1,8 +1,6 @@
 package cn.reghao.autodop.common.dagent.machine.hardware.memory;
 
 import cn.reghao.autodop.common.dagent.machine.hardware.LinuxProc;
-import cn.reghao.autodop.common.utils.ByteConverter;
-import cn.reghao.autodop.common.utils.ByteType;
 import cn.reghao.autodop.common.utils.text.TextFile;
 
 import java.util.LinkedHashMap;
@@ -16,7 +14,6 @@ import java.util.Map;
  */
 public class Memory {
     private TextFile textFile = new TextFile();
-    private ByteConverter convert = new ByteConverter();
 
     public MemoryInfo info() {
         Map<String, String> map = parseMemInfo();
@@ -37,16 +34,16 @@ public class Memory {
         long swapUsed = swapTotal - swapUnused;
 
         MemoryInfo memoryInfo = new MemoryInfo();
-        memoryInfo.setTotal(convert.convert(ByteType.KiB, total));
-        memoryInfo.setUsed(convert.convert(ByteType.KiB, used));
-        memoryInfo.setFree(convert.convert(ByteType.KiB, free));
-        memoryInfo.setShared(convert.convert(ByteType.KiB, shared));
-        memoryInfo.setBuffCache(convert.convert(ByteType.KiB, buffCache));
-        memoryInfo.setAvailable(convert.convert(ByteType.KiB, available));
-
-        memoryInfo.setSwapTotal(convert.convert(ByteType.KiB, swapTotal));
-        memoryInfo.setSwapUsed(convert.convert(ByteType.KiB, swapUsed));
-        memoryInfo.setSwapFree(convert.convert(ByteType.KiB, swapUnused));
+        memoryInfo.setTotal(total);
+        memoryInfo.setUsed(used);
+        memoryInfo.setFree(free);
+        memoryInfo.setShared(shared);
+        memoryInfo.setBuffCache(buffCache);
+        memoryInfo.setAvailable(available);
+
+        memoryInfo.setSwapTotal(total);
+        memoryInfo.setSwapUsed(swapUsed);
+        memoryInfo.setSwapFree(swapUnused);
         return memoryInfo;
     }
 

+ 10 - 9
common/src/main/java/cn/reghao/autodop/common/dagent/machine/hardware/memory/MemoryInfo.java

@@ -10,13 +10,14 @@ import lombok.Data;
  */
 @Data
 public class MemoryInfo {
-    private String total;
-    private String used;
-    private String free;
-    private String shared;
-    private String buffCache;
-    private String available;
-    private String swapTotal;
-    private String swapUsed;
-    private String swapFree;
+    // KiB
+    private long total;
+    private long used;
+    private long free;
+    private long shared;
+    private long buffCache;
+    private long available;
+    private long swapTotal;
+    private long swapUsed;
+    private long swapFree;
 }

+ 1 - 1
common/src/main/java/cn/reghao/autodop/common/message/ops/DagentOps.java

@@ -5,5 +5,5 @@ package cn.reghao.autodop.common.message.ops;
  * @date 2020-12-25 19:15:00
  */
 public enum DagentOps {
-    dagentLog
+    dagentStart, dagentHeartbeat, dagentLog, dagnetShutdown
 }

+ 0 - 1
common/src/main/java/cn/reghao/autodop/common/message/ops/MachineOps.java

@@ -5,7 +5,6 @@ package cn.reghao.autodop.common.message.ops;
  * @date 2020-12-25 19:15:00
  */
 public enum MachineOps {
-    machineRegistry, machineHeartbeat,
     machineShell, machineState,
     machineShellResult, machineStateResult,
 }

+ 8 - 0
common/src/main/java/cn/reghao/autodop/common/utils/ByteConverter.java

@@ -36,4 +36,12 @@ public class ByteConverter {
             return "data too large...";
         }
     }
+
+    public long convert(ByteType src, ByteType dest, long value) {
+        for (int i = src.ordinal(); i < dest.ordinal(); i++) {
+            value = value >> 10;
+        }
+
+        return value;
+    }
 }

+ 2 - 2
dagent/src/main/java/cn/reghao/autodop/dagent/machine/HeartbeatJob.java

@@ -3,7 +3,7 @@ package cn.reghao.autodop.dagent.machine;
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.dagent.machine.Machine;
-import cn.reghao.autodop.common.message.ops.MachineOps;
+import cn.reghao.autodop.common.message.ops.DagentOps;
 import cn.reghao.autodop.common.mqtt.MqttPub;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import lombok.extern.slf4j.Slf4j;
@@ -28,7 +28,7 @@ public class HeartbeatJob implements Job {
 
         String payload = JsonConverter.objectToJson(machine.heartbeat());
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MessageType.machineType.name(),
-                MachineOps.machineHeartbeat.name(), payload);
+                DagentOps.dagentHeartbeat.name(), payload);
         try {
             mqttPub.pub("dmaster", JsonConverter.objectToJson(asyncMsg));
         } catch (MqttException e) {

+ 2 - 1
dagent/src/main/java/cn/reghao/autodop/dagent/utils/DagentLifecycle.java

@@ -4,6 +4,7 @@ import cn.reghao.autodop.common.log.Appenders;
 import cn.reghao.autodop.common.message.AsyncMsg;
 import cn.reghao.autodop.common.message.MessageType;
 import cn.reghao.autodop.common.dagent.machine.Machine;
+import cn.reghao.autodop.common.message.ops.DagentOps;
 import cn.reghao.autodop.common.message.ops.MachineOps;
 import cn.reghao.autodop.common.mqtt.MosquittoProperties;
 import cn.reghao.autodop.common.mqtt.MqttPub;
@@ -89,7 +90,7 @@ public class DagentLifecycle implements ApplicationRunner, DisposableBean {
 
         String payload = JsonConverter.objectToJson(machine.registry());
         AsyncMsg asyncMsg = AsyncMsg.asyncMsg(MessageType.machineType.name(),
-                MachineOps.machineRegistry.name(), payload);
+                DagentOps.dagentStart.name(), payload);
         mqttPub.pub("dmaster", JsonConverter.objectToJson(asyncMsg));
         log.info("发送机器注册信息...");
     }

+ 70 - 1
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/DagentOpsProcessor.java

@@ -1,14 +1,26 @@
 package cn.reghao.autodop.dmaster.mqttsub.processor;
 
+import cn.reghao.autodop.common.dagent.machine.api.data.MachineHeartbeat;
+import cn.reghao.autodop.common.dagent.machine.hardware.disk.DiskInfo;
 import cn.reghao.autodop.common.log.RunningLog;
 import cn.reghao.autodop.common.message.ops.DagentOps;
+import cn.reghao.autodop.common.utils.ByteConverter;
+import cn.reghao.autodop.common.utils.ByteType;
+import cn.reghao.autodop.common.utils.PercentCalculator;
 import cn.reghao.autodop.common.utils.serializer.JsonConverter;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
+import cn.reghao.autodop.dmaster.machine.db.MachineCrudService;
+import cn.reghao.autodop.dmaster.machine.entity.MachineInfo;
+import cn.reghao.autodop.dmaster.machine.entity.MachineStatus;
+import cn.reghao.autodop.dmaster.machine.repository.MachineStatusRepository;
 import cn.reghao.autodop.dmaster.sys.entity.AppRunningLog;
 import cn.reghao.autodop.dmaster.sys.repository.AppRunningLogRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
+import java.time.LocalDateTime;
+import java.util.List;
+
 /**
  * 分发处理 Dagent 相关的消息
  *
@@ -18,26 +30,83 @@ import org.springframework.stereotype.Component;
 @Slf4j
 @Component
 public class DagentOpsProcessor implements OpsProcessor {
+    private ByteConverter convert = new ByteConverter();
+    private MachineCrudService machineCrudService;
+    private MachineStatusRepository statusRepository;
     private AppRunningLogRepository runtimeLogRepository;
 
-    public DagentOpsProcessor(AppRunningLogRepository runtimeLogRepository) {
+    public DagentOpsProcessor(MachineCrudService machineCrudService,
+                              MachineStatusRepository statusRepository,
+                              AppRunningLogRepository runtimeLogRepository) {
+        this.machineCrudService = machineCrudService;
+        this.statusRepository = statusRepository;
         this.runtimeLogRepository = runtimeLogRepository;
     }
 
     @Override
     public void process(String ops, String payload) {
         switch (DagentOps.valueOf(ops)) {
+            case dagentStart:
+                processDagentStart(payload);
+                break;
+            case dagentHeartbeat:
+                processDagentHeartbeat(payload);
+                break;
             case dagentLog:
                 processDagentLog(payload);
                 break;
+            case dagnetShutdown:
+                processDagentShutdown(payload);
+                break;
             default:
                 log.error("DagentOps 中没有相应类型...");
                 break;
         }
     }
 
+    private void processDagentStart(String payload) {
+        MachineInfo machineInfo = JsonConverter.jsonToObject(payload, MachineInfo.class);
+        machineCrudService.insertOrUpdate(machineInfo);
+    }
+
+    private void processDagentHeartbeat(String payload) {
+        MachineHeartbeat heartbeat = JsonConverter.jsonToObject(payload, MachineHeartbeat.class);
+        String machineIpv4 = heartbeat.getMachineIpv4();
+        String diskAvail = calculateDiskUsage(machineIpv4, heartbeat.getDiskInfo());
+        MachineStatus status = statusRepository.findByMachineId(heartbeat.getMachineId());
+        if (status != null) {
+            status.setMachineIpv4(machineIpv4);
+            status.setDiskAvail(diskAvail);
+            status.setLastCheck(LocalDateTime.now());
+        } else {
+            status = new MachineStatus(heartbeat, diskAvail);
+        }
+        statusRepository.save(status);
+    }
+
+    private String calculateDiskUsage(String machineIpv4, List<DiskInfo> diskInfos) {
+        long size = 0, avail = 0;
+        for (DiskInfo diskInfo : diskInfos) {
+            size += diskInfo.getSize();
+            avail += diskInfo.getAvail();
+        }
+
+        String availSize = convert.convert(ByteType.Bytes, ByteType.MiB, avail) + ByteType.MiB.name();
+        double value = PercentCalculator.percentValue(avail, size);
+        String percent = PercentCalculator.percent(value);
+        double minimalPercent = 0.20;
+        if (value < minimalPercent) {
+            // TODO 发出告警通知
+            log.info("{} 上可用的磁盘空间仅占总磁盘空间的 {},共计 {}", machineIpv4, percent, availSize);
+        }
+        return availSize;
+    }
+
     private void processDagentLog(String payload) {
         RunningLog runningLog = JsonConverter.jsonToObject(payload, RunningLog.class);
         runtimeLogRepository.save(AppRunningLog.from(runningLog));
     }
+
+    private void processDagentShutdown(String payload) {
+    }
 }

+ 0 - 66
dmaster/src/main/java/cn/reghao/autodop/dmaster/mqttsub/processor/MachineOpsProcessor.java

@@ -1,23 +1,10 @@
 package cn.reghao.autodop.dmaster.mqttsub.processor;
 
-import cn.reghao.autodop.common.dagent.machine.api.data.MachineHeartbeat;
-import cn.reghao.autodop.common.dagent.machine.hardware.disk.DiskInfo;
 import cn.reghao.autodop.common.message.ops.MachineOps;
-import cn.reghao.autodop.common.utils.ByteConverter;
-import cn.reghao.autodop.common.utils.ByteType;
-import cn.reghao.autodop.common.utils.PercentCalculator;
-import cn.reghao.autodop.common.utils.serializer.JsonConverter;
-import cn.reghao.autodop.dmaster.machine.entity.MachineInfo;
-import cn.reghao.autodop.dmaster.machine.entity.MachineStatus;
-import cn.reghao.autodop.dmaster.machine.repository.MachineStatusRepository;
-import cn.reghao.autodop.dmaster.machine.db.MachineCrudService;
 import cn.reghao.autodop.common.message.ops.OpsProcessor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
 
-import java.time.LocalDateTime;
-import java.util.List;
-
 /**
  * 分发 Machine 相关的消息
  *
@@ -27,25 +14,10 @@ import java.util.List;
 @Slf4j
 @Component
 public class MachineOpsProcessor implements OpsProcessor {
-    private ByteConverter convert = new ByteConverter();
-    private MachineCrudService machineCrudService;
-    private MachineStatusRepository statusRepository;
-
-    public MachineOpsProcessor(MachineCrudService machineCrudService, MachineStatusRepository statusRepository) {
-        this.machineCrudService = machineCrudService;
-        this.statusRepository = statusRepository;
-    }
-
     @Override
     public void process(String ops, String payload) {
         try {
             switch (MachineOps.valueOf(ops)) {
-                case machineRegistry:
-                    processMachineRegistry(payload);
-                    break;
-                case machineHeartbeat:
-                    processMachineHeartbeat(payload);
-                    break;
                 case machineShellResult:
                 case machineStateResult:
                     break;
@@ -56,42 +28,4 @@ public class MachineOpsProcessor implements OpsProcessor {
             e.printStackTrace();
         }
     }
-
-    private void processMachineRegistry(String payload) {
-        MachineInfo machineInfo = JsonConverter.jsonToObject(payload, MachineInfo.class);
-        machineCrudService.insertOrUpdate(machineInfo);
-    }
-
-    private void processMachineHeartbeat(String payload) {
-        MachineHeartbeat heartbeat = JsonConverter.jsonToObject(payload, MachineHeartbeat.class);
-        String machineIpv4 = heartbeat.getMachineIpv4();
-        String diskAvail = calculateDiskUsage(machineIpv4, heartbeat.getDiskInfo());
-        MachineStatus status = statusRepository.findByMachineId(heartbeat.getMachineId());
-        if (status != null) {
-            status.setMachineIpv4(machineIpv4);
-            status.setDiskAvail(diskAvail);
-            status.setLastCheck(LocalDateTime.now());
-        } else {
-            status = new MachineStatus(heartbeat, diskAvail);
-        }
-        statusRepository.save(status);
-    }
-
-    private String calculateDiskUsage(String machineIpv4, List<DiskInfo> diskInfos) {
-        long size = 0, avail = 0;
-        for (DiskInfo diskInfo : diskInfos) {
-            size += diskInfo.getSize();
-            avail += diskInfo.getAvail();
-        }
-
-        String availSize = convert.convert(ByteType.Bytes, avail);
-        double value = PercentCalculator.percentValue(avail, size);
-        String percent = PercentCalculator.percent(value);
-        double minimalPercent = 0.20;
-        if (value < minimalPercent) {
-            // TODO 发出告警通知
-            log.info("{} 上可用的磁盘空间仅占总磁盘空间的 {},共计 {}", machineIpv4, percent, availSize);
-        }
-        return availSize;
-    }
 }

+ 25 - 0
dmaster/src/test/java/cn/reghao/autodop/common/dagent/machine/hardware/disk/DiskTest.java

@@ -0,0 +1,25 @@
+package cn.reghao.autodop.common.dagent.machine.hardware.disk;
+
+import cn.reghao.autodop.common.utils.ByteConverter;
+import cn.reghao.autodop.common.utils.ByteType;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+class DiskTest {
+    private ByteConverter convert = new ByteConverter();
+
+    @Test
+    void info() {
+        Disk disk = new Disk();
+        for (DiskInfo info : disk.info()) {
+            long size = info.getSize();
+            long mibSize = convert.convert(ByteType.Bytes, ByteType.MiB, size);
+            System.out.println();
+        }
+    }
+
+    @Test
+    void diskInfo() {
+    }
+}