Procházet zdrojové kódy

还原读取日志文件的 logstash 模块

reghao před 2 měsíci
rodič
revize
5cad41bb6d

+ 9 - 0
bin/logstash/devopslogstash.json

@@ -0,0 +1,9 @@
+{
+  "wsUrl": "ws://localhost:4030/bgws/log/push",
+  "logFiles": [
+    {
+      "domain": "reghao.cn",
+      "filePath": "/home/reghao/data/log/reghao.cn.access.log"
+    }
+  ]
+}

+ 11 - 0
bin/logstash/shutdown.sh

@@ -0,0 +1,11 @@
+#!/bin/bash
+
+app_name='devops-logstash.jar'
+pid=`ps -ef | grep ${app_name} | grep -v grep | awk '{print $2}'`
+if [[ -z ${pid} ]];
+then
+  echo ${app_name}" killed"
+else
+  echo "kill "${app_name}" with pid "${pid}
+  kill -15 ${pid}
+fi

+ 5 - 0
bin/logstash/start.sh

@@ -0,0 +1,5 @@
+#!/bin/bash
+
+app_dir=`pwd`
+app_name='devops-logstash.jar'
+java -jar ${app_dir}"/"${app_name} ${app_dir}/devopslogstash.json > console.log 2>&1 &

+ 68 - 0
logstash/pom.xml

@@ -0,0 +1,68 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>bnt</artifactId>
+        <groupId>cn.reghao.bnt</groupId>
+        <version>1.0.0</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>logstash</artifactId>
+
+    <properties>
+        <project.build.outputDir>${project.parent.basedir}/bin/logstash</project.build.outputDir>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>4.10.0</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <finalName>devops-${project.artifactId}</finalName>
+        <resources>
+            <resource>
+                <directory>src/main/resources</directory>
+                <filtering>true</filtering>
+                <includes>
+                    <include>logback.xml</include>
+                </includes>
+            </resource>
+        </resources>
+
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.2.0</version>
+                <configuration>
+                    <archive>
+                        <manifest>
+                            <mainClass>cn.reghao.bnt.logstash.LogStashApp</mainClass>
+                        </manifest>
+                    </archive>
+                    <descriptorRefs>
+                        <descriptorRef>jar-with-dependencies</descriptorRef>
+                    </descriptorRefs>
+                    <!-- 不设置此属性则生成的 jar 包名字会带有 jar-with-dependencies -->
+                    <appendAssemblyId>false</appendAssemblyId>
+                    <outputDirectory>${project.build.outputDir}</outputDirectory>
+                </configuration>
+                <executions>
+                    <execution>
+                        <id>make-assembly</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>single</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

+ 47 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/LogStashApp.java

@@ -0,0 +1,47 @@
+package cn.reghao.bnt.logstash;
+
+import cn.reghao.bnt.logstash.config.ConfigFile;
+import cn.reghao.bnt.logstash.config.LogFile;
+import cn.reghao.bnt.logstash.config.LogstashConfig;
+import cn.reghao.bnt.logstash.service.FileReader;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.util.SingleInstance;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2023-11-08 10:34:51
+ */
+@Slf4j
+public class LogStashApp {
+    static void shutdownGracefully() {
+        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "main-shutdown-hook"));
+    }
+
+    static class ShutdownHook implements Runnable {
+        @Override
+        public void run() {
+            log.info("资源清理完成,结束 devops-logstash...");
+        }
+    }
+
+    public static void main(String[] args) {
+        if (args.length != 1) {
+            log.error("必须指定配置文件...");
+            return;
+        }
+
+        String configFilePath = ConfigFile.configFilePath(args[0], LogStashApp.class);
+        LogstashConfig logstashConfig = JsonConverter.jsonFileToObject(new File(configFilePath), LogstashConfig.class);
+        String wsUrl = logstashConfig.getWsUrl();
+        List<LogFile> logFiles = logstashConfig.getLogFiles();
+        FileReader fileReader = new FileReader(wsUrl, logFiles);
+        fileReader.start();
+
+        shutdownGracefully();
+        SingleInstance.onlyOne(60002);
+    }
+}

+ 53 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/config/ConfigFile.java

@@ -0,0 +1,53 @@
+package cn.reghao.bnt.logstash.config;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.File;
+import java.net.URL;
+import java.net.URLDecoder;
+import java.nio.charset.StandardCharsets;
+
+/**
+ * @author reghao
+ * @date 2021-03-03 18:47:01
+ */
+@Slf4j
+public class ConfigFile {
+    public static String configFilePath(String arg, Class<?> clazz) {
+        String configFilePath = null;
+        File configFile = new File(arg);
+        if (!configFile.exists()) {
+            if (arg.startsWith("./")) {
+                String filename = arg.replace(".", "")
+                        .replace("/", "");
+                configFilePath = runningHome(clazz) + "/" + filename;
+            } else if (!arg.contains("/")) {
+                configFilePath = runningHome(clazz) + "/" + arg;
+            } else {
+                log.error("相对路径的配置文件必须以 ./configFile 或 configFile 形式指定...");
+            }
+        } else {
+            // 绝对路径
+            configFilePath = arg;
+        }
+        return configFilePath;
+    }
+    
+    /**
+     * jar 文件运行目录
+     *
+     * @param
+     * @return
+     * @date 2021-03-03 下午6:33
+     */
+    public static String runningHome(Class<?> clazz) {
+        URL url = clazz.getProtectionDomain().getCodeSource().getLocation();
+        String jarFilePath = URLDecoder.decode(url.getPath(), StandardCharsets.UTF_8);
+        if (jarFilePath.endsWith(".jar")) {
+            jarFilePath = jarFilePath.substring(0, jarFilePath.lastIndexOf("/") + 1);
+        }
+
+        File file = new File(jarFilePath);
+        return file.getAbsolutePath();
+    }
+}

+ 29 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogFile.java

@@ -0,0 +1,29 @@
+package cn.reghao.bnt.logstash.config;
+
+import lombok.AllArgsConstructor;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 18:28:21
+ */
+@AllArgsConstructor
+public class LogFile {
+    private String domain;
+    private String filePath;
+
+    public void setDomain(String domain) {
+        this.domain = domain;
+    }
+
+    public String getDomain() {
+        return domain;
+    }
+
+    public void setFilePath(String filePath) {
+        this.filePath = filePath;
+    }
+
+    public String getFilePath() {
+        return filePath;
+    }
+}

+ 17 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogstashConfig.java

@@ -0,0 +1,17 @@
+package cn.reghao.bnt.logstash.config;
+
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.List;
+
+/**
+ * @author reghao
+ * @date 2023-11-08 10:44:58
+ */
+@Setter
+@Getter
+public class LogstashConfig {
+    private String wsUrl;
+    private List<LogFile> logFiles;
+}

+ 41 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/service/FileReader.java

@@ -0,0 +1,41 @@
+package cn.reghao.bnt.logstash.service;
+
+import cn.reghao.bnt.logstash.config.LogFile;
+import cn.reghao.bnt.logstash.ws.WsClient;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+
+import java.util.List;
+import java.util.concurrent.*;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 16:23:34
+ */
+public class FileReader {
+    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("logstash");
+    private final ScheduledExecutorService scheduledThreadPool = ThreadPoolWrapper.scheduledThreadPool("logstash1", 5);
+    private final String wsUrl;
+    private final List<LogFile> logFiles;
+
+    public FileReader(String wsUrl, List<LogFile> logFiles) {
+        this.wsUrl = wsUrl;
+        this.logFiles = logFiles;
+    }
+
+    public void start() {
+        String params = String.format("app=%s&host=%s", "nginx", "api.reghao.cn");
+        String url = String.format("%s?%s", wsUrl, params);
+        WsClient wsClient = new WsClient(url);
+        wsClient.connect();
+        for (LogFile logFile: logFiles) {
+            String filePath = logFile.getFilePath();
+            try {
+                TailReader tailReader = new TailReader(filePath, wsClient);
+                threadPool.submit(tailReader);
+                //scheduledThreadPool.scheduleAtFixedRate(tailReader, 0, 1, TimeUnit.SECONDS);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+}

+ 76 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/service/TailReader.java

@@ -0,0 +1,76 @@
+package cn.reghao.bnt.logstash.service;
+
+import cn.reghao.bnt.logstash.ws.WsClient;
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.jutil.jdk.web.log.NginxLog;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.FileNotFoundException;
+import java.io.RandomAccessFile;
+
+/**
+ * @author reghao
+ * @date 2022-05-20 11:41:08
+ */
+@Slf4j
+public class TailReader implements Runnable {
+    private final String filePath;
+    private final RandomAccessFile raf;
+    private long pointer;
+    private final WsClient wsClient;
+
+    public TailReader(String filePath, WsClient wsClient) throws FileNotFoundException {
+        this.filePath = filePath;
+        this.raf  = new RandomAccessFile(filePath, "r");
+        this.pointer = 0;
+        this.wsClient = wsClient;
+    }
+
+    @Override
+    public void run() {
+        try {
+            //raf.seek(length);
+            while (!Thread.interrupted()) {
+                try {
+                    /*String line = raf.readLine();
+                    while (line == null) {
+                        Thread.sleep(3_000);
+                        line = raf.readLine();
+                    }
+
+                    while (line != null) {
+                        parseAndPersist(line);
+                        line = raf.readLine();
+                    }*/
+
+                    long length = raf.length();
+                    if (length > pointer) {
+                        raf.seek(pointer);
+                        String line = raf.readLine();
+                        while (line != null) {
+                            parseAndPersist(line);
+                            line = raf.readLine();
+                        }
+                        pointer = length;
+                    } else {
+                        log.info("已读取到 {} 文件末尾, 休眠 10s 后再尝试读取...", filePath);
+                        Thread.sleep(10_000);
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        } catch (Exception e ) {
+            e.printStackTrace();
+        }
+    }
+
+    private void parseAndPersist(String line) {
+        try {
+            NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
+            wsClient.send("", nginxLog);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 89 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WebSocketListenerImpl.java

@@ -0,0 +1,89 @@
+package cn.reghao.bnt.logstash.ws;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Response;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okio.ByteString;
+
+import java.io.EOFException;
+import java.net.ConnectException;
+import java.net.ProtocolException;
+
+/**
+ * @author reghao
+ * @date 2023-02-23 09:26:50
+ */
+@Slf4j
+public class WebSocketListenerImpl extends WebSocketListener {
+    private final WsClient wsClient;
+
+    public WebSocketListenerImpl(WsClient wsClient) {
+        this.wsClient = wsClient;
+    }
+
+    @Override
+    public void onOpen(WebSocket webSocket, Response response) {
+        log.info("WebSocket 连接成功");
+        wsClient.setConnected(true);
+        wsClient.resetRetryCount();
+    }
+
+    @Override
+    public void onClosing(WebSocket webSocket, int code, String reason) {
+        log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
+        wsClient.setConnected(false);
+        if (wsClient.isRetry()) {
+            reconnect();
+        }
+    }
+
+    @Override
+    public void onClosed(WebSocket webSocket, int code, String reason) {
+        log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
+        wsClient.setConnected(false);
+    }
+
+    @Override
+    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
+        log.info("WebSocket 异常事件: {}", throwable.toString());
+        if (throwable instanceof ConnectException
+                || throwable instanceof EOFException
+                || throwable instanceof ProtocolException) {
+            wsClient.setConnected(false);
+            if (wsClient.isRetry()) {
+                reconnect();
+            }
+        } else {
+            throwable.printStackTrace();
+        }
+    }
+
+    private void reconnect() {
+        log.info("WebSocket 重连");
+        try {
+            if (wsClient.getRetryCount() > 10) {
+                log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
+                Thread.sleep(60_000);
+                wsClient.resetRetryCount();
+            } else {
+                log.info("休眠 10s 后再尝试重连");
+                Thread.sleep(10_000);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        wsClient.retryCountIncr();
+        wsClient.connect();
+    }
+
+    @Override
+    public void onMessage(WebSocket webSocket, String text) {
+    }
+
+    @Override
+    public void onMessage(WebSocket webSocket, ByteString bytes) {
+        Object object = JdkSerializer.deserialize(bytes.toByteArray());
+    }
+}

+ 81 - 0
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WsClient.java

@@ -0,0 +1,81 @@
+package cn.reghao.bnt.logstash.ws;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import okhttp3.*;
+import okio.ByteString;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2023-02-23 09:26:50
+ */
+public class WsClient {
+    private final String url;
+    private WebSocket webSocket;
+    private boolean connected;
+    private final WebSocketListener webSocketListener;
+    private boolean retry;
+    private int retryCount;
+
+    public WsClient(String url) {
+        this.url = url;
+        this.webSocketListener = new WebSocketListenerImpl(this);
+        this.retry = true;
+        this.retryCount = 0;
+    }
+
+    public void setRetry(boolean retry) {
+        this.retry = retry;
+    }
+
+    public boolean isRetry() {
+        return retry;
+    }
+
+    public void retryCountIncr() {
+        this.retryCount += 1;
+    }
+
+    public void resetRetryCount() {
+        this.retryCount = 0;
+    }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void connect() {
+        Request request = new Request.Builder()
+                .url(url)
+                .header("Authorization", "Bearer ")
+                .build();
+
+        OkHttpClient okHttpClient = new OkHttpClient.Builder()
+                .connectTimeout(30, TimeUnit.SECONDS)
+                .readTimeout(30, TimeUnit.SECONDS)
+                .writeTimeout(30, TimeUnit.SECONDS)
+                .build();
+        this.webSocket = okHttpClient.newWebSocket(request, webSocketListener);
+    }
+
+    public void setConnected(boolean status) {
+        this.connected = status;
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public void send(String dest, Object message) {
+        if (isConnected()) {
+            byte[] bytes = JdkSerializer.serialize(message);
+            webSocket.send(ByteString.of(bytes));
+        }
+    }
+
+    public void close() {
+        setRetry(false);
+        webSocket.close(1000, "Client Close Connection");
+    }
+}

+ 1 - 0
pom.xml

@@ -12,6 +12,7 @@
         <module>common</module>
         <module>agent</module>
         <module>deployer</module>
+        <module>logstash</module>
     </modules>
     <packaging>pom</packaging>