فهرست منبع

agent 模块中添加 logstash 功能, 通过 devopsagent.json 配置文件中的 logstash 参数来指定是否启用日志收集功能

reghao 2 ماه پیش
والد
کامیت
ba859902a9

+ 8 - 6
agent/src/main/java/cn/reghao/bnt/agent/AgentApp.java

@@ -4,7 +4,8 @@ import ch.qos.logback.classic.Level;
 import ch.qos.logback.classic.Logger;
 import ch.qos.logback.classic.LoggerContext;
 import cn.reghao.bnt.agent.config.ConfigFile;
-import cn.reghao.bnt.agent.config.DagentConfig;
+import cn.reghao.bnt.agent.config.AgentConfig;
+import cn.reghao.bnt.agent.service.FileReader;
 import cn.reghao.bnt.agent.ws.WsClient;
 import cn.reghao.bnt.common.agent.app.iface.AppDeploy;
 import cn.reghao.bnt.common.agent.app.iface.AppStat;
@@ -62,13 +63,14 @@ public class AgentApp {
 		return false;
 	}
 
-	static MessageSender getMessageSender(DagentConfig dagentConfig) {
+	static MessageSender getMessageSender(AgentConfig agentConfig) {
 		DockerApp dockerApp = new DockerApp(docker);
 		AppDeploy appDeploy = new AppDeployImpl(dockerApp);
 		AppStat appStat = new AppStatImpl(dockerApp);
+		FileReader fileReader = new FileReader(agentConfig.getLogFiles());
 
-		if (tryConnect(dagentConfig.getHost(), dagentConfig.getPort())) {
-			messageSender = new WsClient(dagentConfig, appDeploy, appStat);
+		if (tryConnect(agentConfig.getHost(), agentConfig.getPort())) {
+			messageSender = new WsClient(agentConfig, fileReader, appDeploy, appStat);
 			return messageSender;
 		}
 
@@ -95,8 +97,8 @@ public class AgentApp {
 		}
 
 		String configFilePath = ConfigFile.configFilePath(args[0], AgentApp.class);
-		DagentConfig dagentConfig = JsonConverter.jsonFileToObject(new File(configFilePath), DagentConfig.class);
-		messageSender = getMessageSender(dagentConfig);
+		AgentConfig agentConfig = JsonConverter.jsonFileToObject(new File(configFilePath), AgentConfig.class);
+		messageSender = getMessageSender(agentConfig);
 		if (messageSender == null) {
 			log.error("没有可用的 MessageSender, Agent 结束运行, 具体信息请查看日志");
 			System.exit(-1);

+ 7 - 1
agent/src/main/java/cn/reghao/bnt/agent/config/DagentConfig.java → agent/src/main/java/cn/reghao/bnt/agent/config/AgentConfig.java

@@ -2,15 +2,21 @@ package cn.reghao.bnt.agent.config;
 
 import lombok.AllArgsConstructor;
 import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
 
 /**
  * @author reghao
  * @date 2023-02-23 11:37:32
  */
 @AllArgsConstructor
+@Setter
 @Getter
-public class DagentConfig {
+public class AgentConfig {
     private String protocol;
     private String host;
     private int port;
+    private boolean logstash;
+    private List<LogFile> logFiles;
 }

+ 29 - 0
agent/src/main/java/cn/reghao/bnt/agent/config/LogFile.java

@@ -0,0 +1,29 @@
+package cn.reghao.bnt.agent.config;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 18:28:21
+ */
+@AllArgsConstructor
+public class LogFile {
+    private String domain;
+    private String filePath;
+
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public void setFilePath(String filePath) {
+        this.filePath = filePath;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+}

+ 3 - 3
agent/src/main/java/cn/reghao/bnt/agent/machine/MachineEvent.java

@@ -31,15 +31,13 @@ public class MachineEvent {
     private final Machine machine;
     private final AppStat appStat;
     private final MessageSender messageSender;
-    private final int heartbeatInterval;
     private final Docker docker;
     
-    public MachineEvent(MessageSender messageSender, Machine machine, AppStat appStat, int heartbeatInterval) {
+    public MachineEvent(MessageSender messageSender, Machine machine, AppStat appStat) {
         this.messageSender = messageSender;
         this.machine = machine;
         this.appStat = appStat;
         this.scheduler = AgentApp.scheduler;
-        this.heartbeatInterval = heartbeatInterval;
         this.docker = AgentApp.docker;
     }
 
@@ -70,6 +68,8 @@ public class MachineEvent {
     }
 
     public void agentHeartbeat() {
+        // 每 60s 发送一次心跳
+        int heartbeatInterval = 60;
         heartbeatFuture = scheduler.scheduleAtFixedRate(new Heartbeat(), 5, heartbeatInterval, TimeUnit.SECONDS);
     }
 

+ 33 - 0
agent/src/main/java/cn/reghao/bnt/agent/service/FileReader.java

@@ -0,0 +1,33 @@
+package cn.reghao.bnt.agent.service;
+
+import cn.reghao.bnt.agent.config.LogFile;
+import cn.reghao.bnt.agent.ws.WsClient;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 16:23:34
+ */
+public class FileReader {
+    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("logstash");
+    private final List<LogFile> logFiles;
+
+    public FileReader(List<LogFile> logFiles) {
+        this.logFiles = logFiles;
+    }
+
+    public void start(WsClient wsClient) {
+        for (LogFile logFile: logFiles) {
+            String filePath = logFile.getFilePath();
+            try {
+                TailReader tailReader = new TailReader(filePath, wsClient);
+                threadPool.submit(tailReader);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 100 - 0
agent/src/main/java/cn/reghao/bnt/agent/service/TailReader.java

@@ -0,0 +1,100 @@
+package cn.reghao.bnt.agent.service;
+
+import cn.reghao.bnt.agent.ws.WsClient;
+import cn.reghao.jutil.jdk.io.TextFile;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 11:41:08
+ */
+@Slf4j
+public class TailReader implements Runnable {
+    private final String filePath;
+    private final RandomAccessFile raf;
+    private long pointer;
+    private final WsClient wsClient;
+    private final TextFile textFile;
+    private final String errorLogPath;
+
+    public TailReader(String filePath, WsClient wsClient) throws FileNotFoundException {
+        this.filePath = filePath;
+        this.raf  = new RandomAccessFile(filePath, "r");
+        this.pointer = 0;
+        this.wsClient = wsClient;
+        this.textFile = new TextFile();
+        this.errorLogPath = System.getProperty("user.dir") + "/error.log";
+    }
+
+    @Override
+    public void run() {
+        try {
+            File errLogFile = new File(errorLogPath);
+            if (!errLogFile.exists()) {
+                errLogFile.createNewFile();
+            }
+
+            //raf.seek(length);
+            while (!Thread.interrupted()) {
+                if (!wsClient.isConnected()) {
+                    log.info("websocket disconnected, wait 10s...");
+                    Thread.sleep(10_000);
+                    continue;
+                }
+
+                try {
+                    /*String line = raf.readLine();
+                    while (line == null) {
+                        Thread.sleep(3_000);
+                        line = raf.readLine();
+                    }
+
+                    while (line != null) {
+                        parseAndPersist(line);
+                        line = raf.readLine();
+                    }*/
+
+                    long length = raf.length();
+                    if (length > pointer) {
+                        raf.seek(pointer);
+                        String line = raf.readLine();
+                        while (line != null) {
+                            parseAndPersist(line, errLogFile);
+                            line = raf.readLine();
+                        }
+                        pointer = length;
+                    } else {
+                        log.info("已读取到 {} 文件末尾, 休眠 10s 后再尝试读取...", filePath);
+                        Thread.sleep(10_000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } catch (Exception e ) {
+            e.printStackTrace();
+        }
+    }
+
+    private void parseAndPersist(String line, File errLogFile) {
+        try {
+            NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
+            wsClient.send("", nginxLog);
+        } catch (Exception e) {
+            e.printStackTrace();
+            try {
+                textFile.append(errLogFile.getAbsolutePath(), List.of(line));
+            } catch (IOException ex) {
+                ex.printStackTrace();
+            }
+        }
+    }
+}

+ 14 - 7
agent/src/main/java/cn/reghao/bnt/agent/ws/WebSocketListenerImpl.java

@@ -16,13 +16,15 @@ import okio.ByteString;
  */
 @Slf4j
 public class WebSocketListenerImpl extends WebSocketListener {
-    private final EventCenter eventCenter;
     private final WsClient wsClient;
+    private final EventCenter eventCenter;
     private final MachineEvent machineEvent;
+    private final boolean logstash;
 
-    public WebSocketListenerImpl(WsClient wsClient, EventCenter eventCenter, MachineEvent machineEvent) {
-        this.eventCenter = eventCenter;
+    public WebSocketListenerImpl(WsClient wsClient, boolean logstash, EventCenter eventCenter, MachineEvent machineEvent) {
         this.wsClient = wsClient;
+        this.logstash = logstash;
+        this.eventCenter = eventCenter;
         this.machineEvent = machineEvent;
     }
 
@@ -32,15 +34,20 @@ public class WebSocketListenerImpl extends WebSocketListener {
         wsClient.setConnected(true);
         wsClient.resetRetryCount();
 
-        machineEvent.agentStart();
+        if (!logstash) {
+            machineEvent.agentStart();
+        }
     }
 
     @Override
     public void onClosing(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
+        log.error("WebSocket 连接正在关闭 -> {} - {}", code, reason);
         wsClient.setConnected(false);
 
-        machineEvent.pauseHeartbeat();
+        if (!logstash) {
+            machineEvent.pauseHeartbeat();
+        }
+
         if (wsClient.isRetry()) {
             reconnect();
         }
@@ -48,7 +55,7 @@ public class WebSocketListenerImpl extends WebSocketListener {
 
     @Override
     public void onClosed(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
+        log.error("WebSocket 连接关闭成功 -> {} - {}", code, reason);
         wsClient.setConnected(false);
     }
 

+ 14 - 13
agent/src/main/java/cn/reghao/bnt/agent/ws/WsClient.java

@@ -1,11 +1,11 @@
 package cn.reghao.bnt.agent.ws;
 
-import cn.reghao.bnt.agent.config.DagentConfig;
+import cn.reghao.bnt.agent.config.AgentConfig;
+import cn.reghao.bnt.agent.service.FileReader;
 import cn.reghao.bnt.agent.ws.event.EventCenter;
 import cn.reghao.bnt.common.agent.app.iface.AppDeploy;
 import cn.reghao.bnt.common.agent.app.iface.AppStat;
 import cn.reghao.bnt.agent.machine.MachineEvent;
-import cn.reghao.bnt.common.docker.Docker;
 import cn.reghao.bnt.common.machine.Machine;
 import cn.reghao.bnt.common.msg.MessageSender;
 import cn.reghao.jutil.jdk.serializer.JdkSerializer;
@@ -15,7 +15,6 @@ import okhttp3.WebSocket;
 import okhttp3.WebSocketListener;
 import okio.ByteString;
 
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.logging.Level;
 import java.util.logging.Logger;
@@ -32,19 +31,21 @@ public class WsClient implements MessageSender {
     private boolean retry;
     private int retryCount;
 
-    public WsClient(DagentConfig dagentConfig, AppDeploy appDeploy, AppStat appStat) {
-        String protocol = dagentConfig.getProtocol();
-        String host = dagentConfig.getHost();
-        int port = dagentConfig.getPort();
-        this.url = String.format("%s://%s:%s/bgws/agent?token=%s", protocol, host, port, Machine.ID);
+    public WsClient(AgentConfig agentConfig, FileReader fileReader, AppDeploy appDeploy, AppStat appStat) {
+        String protocol = agentConfig.getProtocol();
+        String host = agentConfig.getHost();
+        int port = agentConfig.getPort();
+        String app = "agent";
+        String host1 = "127.0.0.1";
+        this.url = String.format("%s://%s:%s/bgws/agent?token=%s&app=%s&host=%s", protocol, host, port, Machine.ID, app, host1);
+        boolean logstash = agentConfig.isLogstash();
 
         EventCenter eventCenter = new EventCenter(this, appDeploy, appStat);
-        // 每 60s 发送一次心跳
-        int heartbeatInterval = 60;
-        MachineEvent machineEvent = new MachineEvent(this, new Machine(), appStat, heartbeatInterval);
-        this.webSocketListener = new WebSocketListenerImpl(this, eventCenter, machineEvent);
+        MachineEvent machineEvent = new MachineEvent(this, new Machine(), appStat);
+        this.webSocketListener = new WebSocketListenerImpl(this, logstash, eventCenter, machineEvent);
         this.retry = true;
         this.retryCount = 0;
+        fileReader.start(this);
         Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);
     }
 
@@ -76,7 +77,7 @@ public class WsClient implements MessageSender {
                 .build();
 
         OkHttpClient okHttpClient = new OkHttpClient.Builder()
-                //.pingInterval(30, TimeUnit.SECONDS)
+                .pingInterval(10, TimeUnit.SECONDS)
                 .connectTimeout(60, TimeUnit.SECONDS)
                 .readTimeout(60, TimeUnit.SECONDS)
                 .writeTimeout(60, TimeUnit.SECONDS)

+ 8 - 1
bin/agent/devopsagent.json

@@ -1,5 +1,12 @@
 {
   "protocol": "ws",
   "host": "127.0.0.1",
-  "port": 4030
+  "port": 6007,
+  "logstash": true,
+  "logFiles": [
+    {
+      "domain": "reghao.cn",
+      "filePath": "/home/reghao/Downloads/access-20231107_073356-20240905_165944.log"
+    }
+  ]
 }