浏览代码

删除 logstash 模块

reghao 3 月之前
父节点
当前提交
231b24197a

+ 0 - 68
logstash/pom.xml

@@ -1,68 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>bnt</artifactId>
-        <groupId>cn.reghao.bnt</groupId>
-        <version>1.0.0</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>logstash</artifactId>
-
-    <properties>
-        <project.build.outputDir>${project.basedir}/bin</project.build.outputDir>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>okhttp</artifactId>
-            <version>4.10.0</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <finalName>devops-${project.artifactId}</finalName>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-                <filtering>true</filtering>
-                <includes>
-                    <include>logback.xml</include>
-                </includes>
-            </resource>
-        </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>3.2.0</version>
-                <configuration>
-                    <archive>
-                        <manifest>
-                            <mainClass>cn.reghao.bnt.logstash.LogStashApp</mainClass>
-                        </manifest>
-                    </archive>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <!-- 不设置此属性则生成的 jar 包名字会带有 jar-with-dependencies -->
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <outputDirectory>${project.build.outputDir}</outputDirectory>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

+ 0 - 47
logstash/src/main/java/cn/reghao/bnt/logstash/LogStashApp.java

@@ -1,47 +0,0 @@
-package cn.reghao.bnt.logstash;
-
-import cn.reghao.bnt.logstash.config.ConfigFile;
-import cn.reghao.bnt.logstash.config.LogFile;
-import cn.reghao.bnt.logstash.config.LogstashConfig;
-import cn.reghao.bnt.logstash.service.FileReader;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.util.SingleInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2023-11-08 10:34:51
- */
-@Slf4j
-public class LogStashApp {
-    static void shutdownGracefully() {
-        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "main-shutdown-hook"));
-    }
-
-    static class ShutdownHook implements Runnable {
-        @Override
-        public void run() {
-            log.info("资源清理完成,结束 devops-logstash...");
-        }
-    }
-
-    public static void main(String[] args) {
-        if (args.length != 1) {
-            log.error("必须指定配置文件...");
-            return;
-        }
-
-        String configFilePath = ConfigFile.configFilePath(args[0], LogStashApp.class);
-        LogstashConfig logstashConfig = JsonConverter.jsonFileToObject(new File(configFilePath), LogstashConfig.class);
-        String wsUrl = logstashConfig.getWsUrl();
-        List<LogFile> logFiles = logstashConfig.getLogFiles();
-        FileReader fileReader = new FileReader(wsUrl, logFiles);
-        fileReader.start();
-
-        shutdownGracefully();
-        SingleInstance.onlyOne(60002);
-    }
-}

+ 0 - 53
logstash/src/main/java/cn/reghao/bnt/logstash/config/ConfigFile.java

@@ -1,53 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
-
-/**
- * @author reghao
- * @date 2021-03-03 18:47:01
- */
-@Slf4j
-public class ConfigFile {
-    public static String configFilePath(String arg, Class<?> clazz) {
-        String configFilePath = null;
-        File configFile = new File(arg);
-        if (!configFile.exists()) {
-            if (arg.startsWith("./")) {
-                String filename = arg.replace(".", "")
-                        .replace("/", "");
-                configFilePath = runningHome(clazz) + "/" + filename;
-            } else if (!arg.contains("/")) {
-                configFilePath = runningHome(clazz) + "/" + arg;
-            } else {
-                log.error("相对路径的配置文件必须以 ./configFile 或 configFile 形式指定...");
-            }
-        } else {
-            // 绝对路径
-            configFilePath = arg;
-        }
-        return configFilePath;
-    }
-    
-    /**
-     * jar 文件运行目录
-     *
-     * @param
-     * @return
-     * @date 2021-03-03 下午6:33
-     */
-    public static String runningHome(Class<?> clazz) {
-        URL url = clazz.getProtectionDomain().getCodeSource().getLocation();
-        String jarFilePath = URLDecoder.decode(url.getPath(), StandardCharsets.UTF_8);
-        if (jarFilePath.endsWith(".jar")) {
-            jarFilePath = jarFilePath.substring(0, jarFilePath.lastIndexOf("/") + 1);
-        }
-
-        File file = new File(jarFilePath);
-        return file.getAbsolutePath();
-    }
-}

+ 0 - 29
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogFile.java

@@ -1,29 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.AllArgsConstructor;
-
-/**
- * @author reghao
- * @date 2022-05-20 18:28:21
- */
-@AllArgsConstructor
-public class LogFile {
-    private String domain;
-    private String filePath;
-
-    public void setDomain(String domain) {
-        this.domain = domain;
-    }
-
-    public String getDomain() {
-        return domain;
-    }
-
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-}

+ 0 - 15
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogstashConfig.java

@@ -1,15 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.Getter;
-
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2023-11-08 10:44:58
- */
-@Getter
-public class LogstashConfig {
-    private String wsUrl;
-    private List<LogFile> logFiles;
-}

+ 0 - 30
logstash/src/main/java/cn/reghao/bnt/logstash/model/NginxLog.java

@@ -1,30 +0,0 @@
-package cn.reghao.bnt.logstash.model;
-
-import com.google.gson.annotations.SerializedName;
-import lombok.Getter;
-
-import java.io.Serializable;
-
-/**
- * @author reghao
- * @date 2023-11-07 14:58:07
- */
-@Getter
-public class NginxLog implements Serializable {
-    private static final long serialVersionUID = 1L;
-
-    @SerializedName("time_iso8601") private String timeIso8601;
-    @SerializedName("remote_addr") private String remoteAddr;
-    private String request;
-    private Integer status;
-    @SerializedName("request_method") private String requestMethod;
-    @SerializedName("body_bytes_sent") private Integer bodyBytesSent;
-    @SerializedName("request_time") private Double requestTime;
-    @SerializedName("upstream_response_time") private Double upstreamResponseTime;
-    @SerializedName("upstream_addr") private String upstreamAddr;
-    private String host;
-    private String url;
-    @SerializedName("http_x_forwarded_for") private String httpXForwardedFor;
-    @SerializedName("http_referer") private String httpReferer;
-    @SerializedName("http_user_agent") private String httpUserAgent;
-}

+ 0 - 44
logstash/src/main/java/cn/reghao/bnt/logstash/service/FileReader.java

@@ -1,44 +0,0 @@
-package cn.reghao.bnt.logstash.service;
-
-import cn.reghao.bnt.logstash.config.LogFile;
-import cn.reghao.bnt.logstash.ws.WsClient;
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
-
-import java.util.List;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-
-/**
- * @author reghao
- * @date 2022-05-20 16:23:34
- */
-public class FileReader {
-    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("logstash");
-    private final ScheduledExecutorService scheduledThreadPool = ThreadPoolWrapper.scheduledThreadPool("logstash1", 5);
-    private final String wsUrl;
-    private final List<LogFile> logFiles;
-
-    public FileReader(String wsUrl, List<LogFile> logFiles) {
-        this.wsUrl = wsUrl;
-        this.logFiles = logFiles;
-    }
-
-    public void start() {
-        String app = "nginx";
-        String host = "nginx";
-        String params = String.format("app=%s&host=%s", app, host);
-        String url = String.format("%s?%s", wsUrl, params);
-        WsClient wsClient = new WsClient(url);
-        wsClient.connect();
-        for (LogFile logFile: logFiles) {
-            String filePath = logFile.getFilePath();
-            try {
-                TailReader tailReader = new TailReader(filePath, wsClient);
-                threadPool.submit(tailReader);
-                //scheduledThreadPool.scheduleAtFixedRate(tailReader, 0, 1, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}

+ 0 - 73
logstash/src/main/java/cn/reghao/bnt/logstash/service/TailReader.java

@@ -1,73 +0,0 @@
-package cn.reghao.bnt.logstash.service;
-
-import cn.reghao.bnt.logstash.model.NginxLog;
-import cn.reghao.bnt.logstash.ws.WsClient;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.FileNotFoundException;
-import java.io.RandomAccessFile;
-
-/**
- * @author reghao
- * @date 2022-05-20 11:41:08
- */
-@Slf4j
-public class TailReader implements Runnable {
-    private final String filePath;
-    private final RandomAccessFile raf;
-    private long pointer;
-    private final WsClient wsClient;
-
-    public TailReader(String filePath, WsClient wsClient) throws FileNotFoundException {
-        this.filePath = filePath;
-        this.raf  = new RandomAccessFile(filePath, "r");
-        this.pointer = 0;
-        this.wsClient = wsClient;
-    }
-
-    @Override
-    public void run() {
-        try {
-            long length = raf.length();
-            raf.seek(length);
-            while (!Thread.interrupted()) {
-                try {
-                    String line = raf.readLine();
-                    while (line == null) {
-                        Thread.sleep(3_000);
-                        line = raf.readLine();
-                    }
-
-                    while (line != null) {
-                        parseAndPersist(line);
-                        line = raf.readLine();
-                    }
-                /*if (length > pointer) {
-                    raf.seek(pointer);
-                    String line = raf.readLine();
-                    while (line != null) {
-                        parseAndPersist(line);
-                        line = raf.readLine();
-                    }
-                    pointer = length;
-                } else {
-                    log.info("已读取到 {} 文件末尾, 休眠 10s 后再尝试读取", filePath);
-                    Thread.sleep(10_000);
-                }*/
-                } catch (Exception e) {
-                }
-            }
-        } catch (Exception e ) {
-
-        }
-    }
-
-    private void parseAndPersist(String line) {
-        try {
-            NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
-            wsClient.send("", nginxLog);
-        } catch (Exception ignore) {
-        }
-    }
-}

+ 0 - 89
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WebSocketListenerImpl.java

@@ -1,89 +0,0 @@
-package cn.reghao.bnt.logstash.ws;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import lombok.extern.slf4j.Slf4j;
-import okhttp3.Response;
-import okhttp3.WebSocket;
-import okhttp3.WebSocketListener;
-import okio.ByteString;
-
-import java.io.EOFException;
-import java.net.ConnectException;
-import java.net.ProtocolException;
-
-/**
- * @author reghao
- * @date 2023-02-23 09:26:50
- */
-@Slf4j
-public class WebSocketListenerImpl extends WebSocketListener {
-    private final WsClient wsClient;
-
-    public WebSocketListenerImpl(WsClient wsClient) {
-        this.wsClient = wsClient;
-    }
-
-    @Override
-    public void onOpen(WebSocket webSocket, Response response) {
-        log.info("WebSocket 连接成功");
-        wsClient.setConnected(true);
-        wsClient.resetRetryCount();
-    }
-
-    @Override
-    public void onClosing(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
-        wsClient.setConnected(false);
-        if (wsClient.isRetry()) {
-            reconnect();
-        }
-    }
-
-    @Override
-    public void onClosed(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
-        wsClient.setConnected(false);
-    }
-
-    @Override
-    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
-        log.info("WebSocket 异常事件: {}", throwable.toString());
-        if (throwable instanceof ConnectException
-                || throwable instanceof EOFException
-                || throwable instanceof ProtocolException) {
-            wsClient.setConnected(false);
-            if (wsClient.isRetry()) {
-                reconnect();
-            }
-        } else {
-            throwable.printStackTrace();
-        }
-    }
-
-    private void reconnect() {
-        log.info("WebSocket 重连");
-        try {
-            if (wsClient.getRetryCount() > 10) {
-                log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
-                Thread.sleep(60_000);
-                wsClient.resetRetryCount();
-            } else {
-                log.info("休眠 10s 后再尝试重连");
-                Thread.sleep(10_000);
-            }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        wsClient.retryCountIncr();
-        wsClient.connect();
-    }
-
-    @Override
-    public void onMessage(WebSocket webSocket, String text) {
-    }
-
-    @Override
-    public void onMessage(WebSocket webSocket, ByteString bytes) {
-        Object object = JdkSerializer.deserialize(bytes.toByteArray());
-    }
-}

+ 0 - 84
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WsClient.java

@@ -1,84 +0,0 @@
-package cn.reghao.bnt.logstash.ws;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import okhttp3.OkHttpClient;
-import okhttp3.Request;
-import okhttp3.WebSocket;
-import okhttp3.WebSocketListener;
-import okio.ByteString;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author reghao
- * @date 2023-02-23 09:26:50
- */
-public class WsClient {
-    private final String url;
-    private WebSocket webSocket;
-    private boolean connected;
-    private final WebSocketListener webSocketListener;
-    private boolean retry;
-    private int retryCount;
-
-    public WsClient(String url) {
-        this.url = url;
-        this.webSocketListener = new WebSocketListenerImpl(this);
-        this.retry = true;
-        this.retryCount = 0;
-    }
-
-    public void setRetry(boolean retry) {
-        this.retry = retry;
-    }
-
-    public boolean isRetry() {
-        return retry;
-    }
-
-    public void retryCountIncr() {
-        this.retryCount += 1;
-    }
-
-    public void resetRetryCount() {
-        this.retryCount = 0;
-    }
-
-    public int getRetryCount() {
-        return retryCount;
-    }
-
-    public void connect() {
-        Request request = new Request.Builder()
-                .url(url)
-                .header("Authorization", "Bearer ")
-                .build();
-
-        OkHttpClient okHttpClient = new OkHttpClient.Builder()
-                .connectTimeout(30, TimeUnit.SECONDS)
-                .readTimeout(30, TimeUnit.SECONDS)
-                .writeTimeout(30, TimeUnit.SECONDS)
-                .build();
-        this.webSocket = okHttpClient.newWebSocket(request, webSocketListener);
-    }
-
-    public void setConnected(boolean status) {
-        this.connected = status;
-    }
-
-    public boolean isConnected() {
-        return connected;
-    }
-
-    public void send(String dest, Object message) {
-        if (isConnected()) {
-            byte[] bytes = JdkSerializer.serialize(message);
-            webSocket.send(ByteString.of(bytes));
-        }
-    }
-
-    public void close() {
-        setRetry(false);
-        webSocket.close(1000, "Client Close Connection");
-    }
-}

+ 0 - 1
pom.xml

@@ -12,7 +12,6 @@
         <module>common</module>
         <module>agent</module>
         <module>deployer</module>
-        <module>logstash</module>
     </modules>
     <packaging>pom</packaging>