Browse Source

agent 添加一个 ProcScanTask 任务, 定时扫描机器上 listen socket 的进程, 即机器对外提供的服务

reghao 3 tháng trước cách đây
mục cha
commit
a0ec27421d

+ 5 - 0
agent/src/main/java/cn/reghao/bnt/agent/AgentApp.java

@@ -6,6 +6,7 @@ import ch.qos.logback.classic.LoggerContext;
 import cn.reghao.bnt.agent.config.ConfigFile;
 import cn.reghao.bnt.agent.config.DagentConfig;
 import cn.reghao.bnt.agent.task.ImageCleanTask;
+import cn.reghao.bnt.agent.task.ProcScanTask;
 import cn.reghao.bnt.agent.ws.WsClient;
 import cn.reghao.bnt.common.agent.app.iface.AppDeploy;
 import cn.reghao.bnt.common.agent.app.iface.AppStat;
@@ -108,6 +109,10 @@ public class AgentApp {
 
 		ImageCleanTask cleanTask = new ImageCleanTask(messageSender, docker);
 		scheduler.scheduleAtFixedRate(cleanTask, 1, 8, TimeUnit.HOURS);
+
+		ProcScanTask procScanTask = new ProcScanTask(messageSender, docker);
+		scheduler.scheduleAtFixedRate(procScanTask, 1, 8, TimeUnit.HOURS);
+
 		shutdownGracefully();
 		SingleInstance.onlyOne(60001);
 	}

+ 1 - 1
agent/src/main/java/cn/reghao/bnt/agent/task/ImageCleanTask.java

@@ -30,7 +30,7 @@ public class ImageCleanTask implements Runnable {
 
     @Override
     public void run() {
-        String result = "exec docker image clean task";
+        String result = "exec ImageCleanTask";
         log.info("{}", result);
         EvtTaskResult evtTaskResult = new EvtTaskResult(Machine.ID, result);
         try {

+ 129 - 0
agent/src/main/java/cn/reghao/bnt/agent/task/ProcScanTask.java

@@ -0,0 +1,129 @@
+package cn.reghao.bnt.agent.task;
+
+import cn.reghao.bnt.common.docker.Docker;
+import cn.reghao.bnt.common.machine.Machine;
+import cn.reghao.bnt.common.machine.model.SysProcess;
+import cn.reghao.bnt.common.msg.MessageSender;
+import cn.reghao.bnt.common.msg.event.EvtTaskResult;
+import cn.reghao.jutil.jdk.event.message.EventMessage;
+import com.github.dockerjava.api.command.InspectContainerResponse;
+import lombok.extern.slf4j.Slf4j;
+import oshi.SystemInfo;
+import oshi.software.os.InternetProtocolStats;
+import oshi.software.os.OSThread;
+import oshi.software.os.OperatingSystem;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author reghao
+ * @date 2025-12-17 14:30:36
+ */
+@Slf4j
+public class ProcScanTask implements Runnable {
+    private final MessageSender messageSender;
+    private final Docker docker;
+    private final OperatingSystem os;
+
+    public ProcScanTask(MessageSender messageSender, Docker docker) {
+        this.messageSender = messageSender;
+        this.docker = docker;
+        SystemInfo si = new SystemInfo();
+        this.os = si.getOperatingSystem();
+    }
+
+    @Override
+    public void run() {
+        String result = "exec ProcScanTask";
+        log.info("{}", result);
+        EvtTaskResult evtTaskResult = new EvtTaskResult(Machine.ID, result);
+        try {
+            EventMessage evtMsg = EventMessage.evt(evtTaskResult);
+            messageSender.send("", evtMsg);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+
+        Map<Integer, SysProcess> procMap = os.getProcesses().stream()
+                .map(osProcess -> {
+                    int pid = osProcess.getProcessID();
+                    String name = osProcess.getName();
+                    int ppid = osProcess.getParentProcessID();
+                    String state = osProcess.getState().name();
+                    List<OSThread> osThreadList = osProcess.getThreadDetails();
+                    String cmdLine = osProcess.getCommandLine();
+                    return new SysProcess(pid, name, ppid, cmdLine);
+                })
+                .collect(Collectors.toMap(SysProcess::getPid, sp -> sp));
+
+        String state = InternetProtocolStats.TcpState.LISTEN.name();
+        os.getInternetProtocolStats().getConnections().forEach(ipConnection -> {
+            if (!ipConnection.getState().name().equals(state)) {
+                return;
+            }
+
+            String connType = ipConnection.getType();
+            int pid = ipConnection.getowningProcessId();
+            byte[] localAddress = ipConnection.getLocalAddress();
+            String localAddressStr = byte2Str(localAddress);
+            int localPort = ipConnection.getLocalPort();
+            byte[] remoteAddress = ipConnection.getForeignAddress();
+            int remotePort = ipConnection.getForeignPort();
+            String hostPort = String.format("%s:%s", localAddressStr, localPort);
+
+            SysProcess sysProcess = procMap.get(pid);
+            if (sysProcess != null) {
+                sysProcess.getHostPorts().add(hostPort);
+            }
+        });
+
+        List<InspectContainerResponse> inspectContainerResponseList = docker.psAll();
+        for (InspectContainerResponse response : inspectContainerResponseList) {
+            Boolean running = response.getState().getRunning();
+            if (running != null && running) {
+                Integer pid = response.getState().getPid();
+                if (pid == null) {
+                    continue;
+                }
+
+                String containerId = response.getId();
+                String appId = response.getName().replace("/", "");
+                SysProcess sysProcess = procMap.get(pid);
+                if (sysProcess != null) {
+                    sysProcess.setContainerId(containerId);
+                    sysProcess.setAppId(appId);
+                } else {
+                    log.error("docker process {} not exist", pid);
+                }
+            }
+        }
+
+        List<SysProcess> listenProcessList = procMap.values().stream()
+                .filter(sysProcess -> !sysProcess.getHostPorts().isEmpty())
+                .collect(Collectors.toList());
+        evtTaskResult = new EvtTaskResult(Machine.ID, listenProcessList);
+        try {
+            EventMessage evtMsg = EventMessage.evt(evtTaskResult);
+            messageSender.send("", evtMsg);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    private String byte2Str(byte[] buffer) {
+        String address = "-1:-1:-1:-1";
+        try {
+            InetAddress inetAddress = InetAddress.getByAddress(buffer);
+            address = inetAddress.getHostAddress();
+        } catch (UnknownHostException e) {
+            e.printStackTrace();
+        }
+
+        return address;
+    }
+}

+ 1 - 1
agent/src/main/java/cn/reghao/bnt/agent/ws/event/handler/EvtDockerHandler.java

@@ -65,7 +65,7 @@ public class EvtDockerHandler extends Handler {
             e.printStackTrace();
         }
 
-        int maxSize = 10;
+        int maxSize = 100;
         List<Object> list0 = new ArrayList<>();
         if (list.size() > maxSize) {
             list0.addAll(list.subList(0, maxSize));

+ 2 - 97
agent/src/test/java/AgentTest.java

@@ -1,16 +1,12 @@
 import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.LoggerContext;
-import cn.reghao.bnt.common.docker.DockerImpl;
 import cn.reghao.bnt.common.machine.model.NetworkCard;
 import cn.reghao.jutil.jdk.converter.ByteConverter;
 import cn.reghao.jutil.jdk.converter.DateTimeConverter;
-import cn.reghao.jutil.jdk.converter.IpAddressConverter;
 import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
-import com.github.dockerjava.api.command.InspectContainerResponse;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
-import lombok.Setter;
 import org.slf4j.LoggerFactory;
 import oshi.SystemInfo;
 import oshi.hardware.*;
@@ -273,48 +269,6 @@ public class AgentTest {
         rootLogger.setLevel(Level.INFO);
     }
 
-    static List<ListenProcess> getListenProcesses() {
-        IpAddressConverter ipAddressConverter = new IpAddressConverter();
-        List<SysProcess> sysProcessList = new ArrayList<>();
-        os.getProcesses().forEach(osProcess -> {
-            int pid = osProcess.getProcessID();
-            String name = osProcess.getName();
-            int ppid = osProcess.getParentProcessID();
-            String state = osProcess.getState().name();
-            List<OSThread> osThreadList = osProcess.getThreadDetails();
-            sysProcessList.add(new SysProcess(pid, name, ppid));
-        });
-
-        String state = InternetProtocolStats.TcpState.LISTEN.name();
-        List<ListenProcess> list = new ArrayList<>();
-        os.getInternetProtocolStats().getConnections().forEach(ipConnection -> {
-            if (!ipConnection.getState().name().equals(state)) {
-                return;
-            }
-
-            String type = ipConnection.getType();
-            int pid = ipConnection.getowningProcessId();
-            byte[] localAddress = ipConnection.getLocalAddress();
-            int localPort = ipConnection.getLocalPort();
-            byte[] remoteAddress = ipConnection.getForeignAddress();
-            int remotePort = ipConnection.getForeignPort();
-            System.out.printf("%s %s %s %s\n", pid, type, state, localPort);
-            list.add(new ListenProcess(pid, type, localPort));
-        });
-
-        Map<Integer, List<ListenProcess>> groupMap = list.stream().collect(Collectors.groupingBy(ListenProcess::getPid));
-        os.getProcesses().forEach(osProcess -> {
-            int pid = osProcess.getProcessID();
-            List<ListenProcess> list1 = groupMap.get(pid);
-            if (list1 != null && !list1.isEmpty()) {
-                String name = osProcess.getName();
-                list1.get(0).setName(name);
-            }
-        });
-
-        return list;
-    }
-
     static OSProcess getProcess(int pid) {
         List<OSProcess> list = os.getProcesses().stream()
                 .filter(osProcess -> osProcess.getProcessID() == pid)
@@ -337,47 +291,6 @@ public class AgentTest {
                 .collect(Collectors.toList());
     }
 
-    static void dockerListTest() {
-        List<ListenProcess> listenProcessList = getListenProcesses();
-        Map<Integer, List<ListenProcess>> groupMap = listenProcessList.stream().collect(Collectors.groupingBy(ListenProcess::getPid));
-
-        DockerImpl docker = new DockerImpl();
-        List<InspectContainerResponse> list = docker.psAll();
-        List<DockerProcess> dockerProcessList = new ArrayList<>();
-        for (InspectContainerResponse response : list) {
-            String imageId = response.getImageId();
-            Boolean running = response.getState().getRunning();
-            if (running != null && running) {
-                Integer pid = response.getState().getPid();
-                if (pid == null) {
-                    pid = -1;
-                }
-
-                String containerId = response.getId();
-                String appId = response.getName().replace("/", "");
-                System.out.printf("%s: %s -> %s\n", pid, containerId, appId);
-                dockerProcessList.add(new DockerProcess(pid, containerId, appId));
-            }
-        }
-
-        System.out.println();
-    }
-
-    @Setter
-    @Getter
-    static class ListenProcess {
-        private int pid;
-        private String name;
-        private String type;
-        private int port;
-
-        public ListenProcess(int pid, String type, int port) {
-            this.pid = pid;
-            this.type = type;
-            this.port = port;
-        }
-    }
-
     @AllArgsConstructor
     @Getter
     static class DockerProcess {
@@ -386,14 +299,6 @@ public class AgentTest {
         private String appId;
     }
 
-    @AllArgsConstructor
-    @Getter
-    static class SysProcess {
-        private int pid;
-        private String name;
-        private int ppid;
-    }
-
     static SystemInfo si = new SystemInfo();
     static HardwareAbstractionLayer hal = si.getHardware();
     static OperatingSystem os = si.getOperatingSystem();
@@ -422,8 +327,8 @@ public class AgentTest {
 
     public static void main(String[] args) throws Exception {
         //setLogLevel();
-        int pid = 1483161;
-        getProcessInfo(pid);
+        //int pid = 1483161;
+        //getProcessInfo(pid);
         //List<InternetProtocolStats.IPConnection> processConnections = getProcessConnections(pid, tcpState);
 
         /*getListenProcesses();

+ 38 - 0
common/src/main/java/cn/reghao/bnt/common/machine/model/SysProcess.java

@@ -0,0 +1,38 @@
+package cn.reghao.bnt.common.machine.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * @author reghao
+ * @date 2025-12-17 14:34:09
+ */
+@AllArgsConstructor
+@NoArgsConstructor
+@Setter
+@Getter
+public class SysProcess implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    private int pid;
+    private String name;
+    private int ppid;
+    private String cmdLine;
+    private Set<String> hostPorts;
+    private String containerId;
+    private String appId;
+
+    public SysProcess(int pid, String name, int ppid, String cmdLine) {
+        this.pid = pid;
+        this.name = name;
+        this.ppid = ppid;
+        this.cmdLine = cmdLine.length() > 255 ? cmdLine.substring(0, 255) : cmdLine;
+        this.hostPorts = new HashSet<>();
+    }
+}

+ 21 - 2
common/src/main/java/cn/reghao/bnt/common/msg/event/EvtTaskResult.java

@@ -1,16 +1,35 @@
 package cn.reghao.bnt.common.msg.event;
 
+import cn.reghao.bnt.common.machine.model.SysProcess;
 import cn.reghao.jutil.jdk.event.message.Event;
-import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.List;
 
 /**
  * @author reghao
  * @date 2025-12-16 10:45:44
  */
-@AllArgsConstructor
+@NoArgsConstructor
+@Setter
 @Getter
 public class EvtTaskResult extends Event {
     private String machineId;
+    private int resultType;
     private String result;
+    private List<SysProcess> listenProcessList;
+
+    public EvtTaskResult(String machineId, String result) {
+        this.machineId = machineId;
+        this.resultType = 1;
+        this.result = result;
+    }
+
+    public EvtTaskResult(String machineId, List<SysProcess> listenProcessList) {
+        this.machineId = machineId;
+        this.resultType = 2;
+        this.listenProcessList = listenProcessList;
+    }
 }