Sfoglia il codice sorgente

TailReader 中添加 websocket 发送失败时的处理

reghao 2 mesi fa
parent
commit
7f1b2749ec

+ 24 - 21
agent/src/main/java/cn/reghao/bnt/agent/service/TailReader.java

@@ -11,6 +11,8 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 
 /**
  * @author reghao
@@ -42,26 +44,8 @@ public class TailReader implements Runnable {
                 errLogFile.createNewFile();
             }
 
-            //raf.seek(length);
             while (!Thread.interrupted()) {
-                if (!wsClient.isConnected()) {
-                    log.info("websocket disconnected, wait 10s...");
-                    Thread.sleep(10_000);
-                    continue;
-                }
-
                 try {
-                    /*String line = raf.readLine();
-                    while (line == null) {
-                        Thread.sleep(3_000);
-                        line = raf.readLine();
-                    }
-
-                    while (line != null) {
-                        parseAndPersist(line);
-                        line = raf.readLine();
-                    }*/
-
                     long length = raf.length();
                     if (length > pointer) {
                         raf.seek(pointer);
@@ -84,10 +68,11 @@ public class TailReader implements Runnable {
         }
     }
 
-    private void parseAndPersist(String line, File errLogFile) {
+    Queue<NginxLog> queue = new LinkedBlockingQueue<>();
+    private void parseAndPersist(String line, File errLogFile) throws InterruptedException {
+        NginxLog nginxLog = null;
         try {
-            NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
-            wsClient.send("", nginxLog);
+            nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
         } catch (Exception e) {
             e.printStackTrace();
             try {
@@ -96,5 +81,23 @@ public class TailReader implements Runnable {
                 ex.printStackTrace();
             }
         }
+
+        if (nginxLog != null) {
+            if (!wsClient.isConnected()) {
+                log.info("websocket disconnected, wait 10s...");
+                Thread.sleep(10_000);
+            }
+
+            try {
+                NginxLog nginxLog1 = queue.poll();
+                while (nginxLog1 != null) {
+                    wsClient.send("", nginxLog);
+                    nginxLog1 = queue.poll();
+                }
+                wsClient.send("", nginxLog);
+            } catch (Exception e) {
+                queue.add(nginxLog);
+            }
+        }
     }
 }