فهرست منبع

删除 deployer 和 logstash 模块, deployer 模块的功能集成到 web.devops.deployer 包中, logstash 模块的功能集成到 agent 模块中

reghao 2 ماه پیش
والد
کامیت
99f27d23ee

+ 0 - 61
deployer/pom.xml

@@ -1,61 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>bnt</artifactId>
-        <groupId>cn.reghao.bnt</groupId>
-        <version>1.0.0</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>deployer</artifactId>
-
-    <properties>
-        <project.build.outputDir>${project.basedir}/bin</project.build.outputDir>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.github.mwiede</groupId>
-            <artifactId>jsch</artifactId>
-            <version>2.27.5</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <finalName>devops-${project.artifactId}</finalName>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-                <filtering>true</filtering>
-                <includes>
-                    <include>logback.xml</include>
-                </includes>
-            </resource>
-        </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>3.2.4</version>
-                <executions>
-                    <execution>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>shade</goal>
-                        </goals>
-                        <configuration>
-                            <transformers>
-                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                                    <mainClass>cn.reghao.bnt.deployer.DeployApp</mainClass>
-                                </transformer>
-                            </transformers>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

+ 0 - 58
deployer/src/main/java/cn/reghao/bnt/deployer/DeployApp.java

@@ -1,58 +0,0 @@
-package cn.reghao.bnt.deployer;
-
-import cn.reghao.bnt.deployer.model.RemoteHost;
-import cn.reghao.bnt.deployer.util.Sftp;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2024-02-19 14:16:27
- */
-@Slf4j
-public class DeployApp {
-    public static void main(String[] args) {
-        /*if (args.length != 4) {
-            log.error("usage: java -jar devops-deployer ${app_name} ${local_dir} ${remote_dir} ${sever_file}");
-            return;
-        }
-        String appName = args[0];
-        String localDir = args[1];
-        String remoteDir = args[2];
-        String serverFile = args[3];*/
-
-        String appName = "devops-agent";
-        String localDir = "/home/reghao/code/devops/bnt/agent/bin";
-        String remoteDir = "/opt/app/devops-agent";
-        String serverFile = "/home/reghao/Downloads/servers.csv";
-        //String serverFile = "/home/reghao/Downloads/servers_prod.csv";
-
-        File file1 = new File(localDir);
-        if (!file1.exists() || file1.isFile()) {
-            log.error("local_dir {} is not exist or is a file", localDir);
-            return;
-        }
-
-        File file2 = new File(serverFile);
-        if (!file2.exists() || file2.isDirectory()) {
-            log.error("server_file {} is not exist or is a directory", serverFile);
-            return;
-        }
-
-        log.info("start deploy devops apps");
-        Sftp sftp = new Sftp();
-        List<RemoteHost> remoteHosts = sftp.getRemoteHost(serverFile);
-        for (RemoteHost remoteHost : remoteHosts) {
-            try {
-                String host = remoteHost.getHost();
-                sftp.deploy(localDir, remoteDir, remoteHost);
-                log.info("deploy {} on {} done", appName, host);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-        log.info("deploy devops done");
-    }
-}

+ 0 - 18
deployer/src/main/java/cn/reghao/bnt/deployer/model/RemoteHost.java

@@ -1,18 +0,0 @@
-package cn.reghao.bnt.deployer.model;
-
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-
-/**
- * @author reghao
- * @date 2024-02-20 09:12:10
- */
-@AllArgsConstructor
-@Getter
-public class RemoteHost {
-    private String host;
-    private int port;
-    private String username;
-    private String password;
-    private String prikeyPath;
-}

+ 0 - 54
deployer/src/main/java/cn/reghao/bnt/deployer/model/UserInfoImpl.java

@@ -1,54 +0,0 @@
-package cn.reghao.bnt.deployer.model;
-
-import com.jcraft.jsch.UserInfo;
-import lombok.extern.slf4j.Slf4j;
-
-/**
- * @author reghao
- * @date 2024-02-19 14:44:45
- */
-
-/**
- * ssh private key passphrase info
- */
-@Slf4j
-public class UserInfoImpl implements UserInfo {
-    /**
-     * ssh private key passphrase
-     */
-    private String passphrase;
-
-    public UserInfoImpl (String passphrase) {
-        this.passphrase = passphrase;
-    }
-
-    @Override
-    public String getPassphrase() {
-        return passphrase;
-    }
-
-    @Override
-    public String getPassword() {
-        return null;
-    }
-
-    @Override
-    public boolean promptPassphrase(String s) {
-        return true;
-    }
-
-    @Override
-    public boolean promptPassword(String s) {
-        return false;
-    }
-
-    @Override
-    public boolean promptYesNo(String s) {
-        return true;
-    }
-
-    @Override
-    public void showMessage(String message) {
-        log.info ("SSH Message:{}", message);
-    }
-}

+ 0 - 294
deployer/src/main/java/cn/reghao/bnt/deployer/util/Sftp.java

@@ -1,294 +0,0 @@
-package cn.reghao.bnt.deployer.util;
-
-import cn.reghao.bnt.deployer.model.RemoteHost;
-import cn.reghao.bnt.deployer.model.UserInfoImpl;
-import cn.reghao.jutil.jdk.shell.ShellResult;
-import cn.reghao.jutil.jdk.io.TextFile;
-import com.jcraft.jsch.*;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * @author reghao
- * @date 2024-02-19 11:38:54
- */
-@Slf4j
-public class Sftp {
-    private final TextFile textFile = new TextFile();
-    private final String bash = "/usr/bin/bash";
-
-    public Session getSession(RemoteHost remoteHost) throws Exception {
-        String host = remoteHost.getHost();
-        int port = remoteHost.getPort();
-        String username = remoteHost.getUsername();
-        String password = remoteHost.getPassword();
-        String prikeyPath = remoteHost.getPrikeyPath();
-
-        JSch jsch = new JSch();
-        Session session;
-        if (port <= 0) {
-            //连接服务器,采用默认端口
-            session = jsch.getSession(username, host);
-        } else {
-            //采用指定的端口连接服务器
-            session = jsch.getSession(username, host, port);
-        }
-
-        //如果服务器连接不上,则抛出异常
-        if (session == null) {
-            throw new Exception("session is null");
-        }
-
-        if (password != null) {
-            //设置登陆主机的密码
-            session.setPassword(password);
-            //设置第一次登陆的时候提示,可选值:(ask | yes | no)
-            session.setConfig("StrictHostKeyChecking", "no");
-        } else if (prikeyPath != null) {
-            session.setConfig("PreferredAuthentications", "publickey");
-            session.setConfig("userauth.gssapi-with-mic", "no");
-            session.setConfig("StrictHostKeyChecking", "ask");
-            session.setUserInfo(new UserInfoImpl(""));
-            jsch.addIdentity(prikeyPath);
-
-            session.setConfig("UseDNS", "no");
-            session.setConfig("kex", "diffie-hellman-group1-sha1,"
-                    + "diffie-hellman-group-exchange-sha1,"
-                    + "diffie-hellman-group-exchange-sha256");
-        } else {
-            throw new Exception("password and private key not exist either");
-        }
-
-        session.connect(30_000);
-        return session;
-    }
-
-    public void upload(Session session, String local, String destDir) throws JSchException, SftpException, IOException {
-        // 创建 sftp 通信通道
-        Channel channel = session.openChannel("sftp");
-        channel.connect(5_000);
-        ChannelSftp sftp = (ChannelSftp) channel;
-        sftp.cd(destDir);
-        File localFile = new File(local);
-        if (localFile.isFile()) {
-            putFile(localFile, destDir, sftp);
-        } else {
-            for (File file : Objects.requireNonNull(localFile.listFiles())) {
-                if (file.isFile()) {
-                    putFile(file, destDir, sftp);
-                }
-            }
-        }
-
-        if (channel.isConnected()) {
-            channel.disconnect();
-        }
-    }
-
-    private void putFile(File localFile, String destDir, ChannelSftp sftp) throws IOException, SftpException {
-        String filename = localFile.getName();
-        String remoteFilePath = String.format("%s/%s", destDir, filename);
-        OutputStream outstream = sftp.put(remoteFilePath);
-
-        InputStream instream = new FileInputStream(localFile);
-        byte[] bytes = new byte[1024];
-        int n;
-        while ((n = instream.read(bytes)) != -1) {
-            outstream.write(bytes, 0, n);
-        }
-
-        outstream.flush();
-        outstream.close();
-        instream.close();
-    }
-
-    /**
-     * 在远程机器上创建目录(等价于 mkdir -p)
-     *
-     * @param
-     * @return
-     * @date 2024-02-20 09:20:11
-     */
-    public boolean mkdir(Session session, String remotePath) throws SftpException, JSchException {
-        boolean exist = true;
-        // 创建 sftp 通信通道
-        Channel channel = session.openChannel("sftp");
-        channel.connect(5_000);
-        ChannelSftp sftp = (ChannelSftp) channel;
-
-        List<String> list = new ArrayList<>();
-        String[] arr = remotePath.split("/");
-        for (int i = 0; i < arr.length; i++) {
-            String path = "/" + arr[i];
-            if (i-1 > 0) {
-                list.add(list.get(i-1) + path);
-            } else {
-                list.add(path);
-            }
-        }
-
-        for (String path : list) {
-            try {
-                sftp.stat(path);
-            } catch (SftpException e) {
-                exist = false;
-                sftp.mkdir(path);
-            }
-        }
-
-        return exist;
-    }
-
-    public ShellResult exec(Session session, String command) throws JSchException, IOException {
-        StringBuilder sb = new StringBuilder();
-        // 创建 exec 通信通道
-        ChannelExec channel = (ChannelExec) session.openChannel("exec");
-        channel.setCommand(command);
-        channel.setInputStream(null);
-        channel.setErrStream(System.err);
-        InputStream input = channel.getInputStream();
-        InputStream error = channel.getErrStream();
-
-        // timeout 设置为 10 分钟
-        channel.connect(600_000);
-        /*byte[] tmp = new byte[1024];
-        int i = 0;
-        while (true) {
-            while (input.available() > 0) {
-                i = input.read(tmp, 0, 1024);
-                if (i < 0) {
-                    break;
-                }
-            }
-
-            if (channel.isClosed()) {
-                if (input.available() > 0) {
-                    continue;
-                }
-
-                log.info("channel closed with status: {}", channel.getExitStatus());
-                break;
-            }
-
-            try {
-                Thread.sleep(1000);
-            } catch(Exception e) {
-                e.printStackTrace();
-            }
-        }
-        sb.append(new String(tmp, 0, i));
-        */
-        // stdin
-        BufferedReader inputReader = new BufferedReader(new InputStreamReader(input));
-        String line;
-        while((line = inputReader.readLine()) != null) {
-            sb.append(line).append(System.lineSeparator());
-        }
-
-        // stderr
-        StringBuilder sb1 = new StringBuilder();
-        BufferedReader errorReader = new BufferedReader(new InputStreamReader(error));
-        String line1;
-        while((line1 = errorReader.readLine()) != null) {
-            sb1.append(line1).append(System.lineSeparator());
-        }
-
-        int statusCode = channel.getExitStatus();
-        ShellResult shellResult = new ShellResult(statusCode);
-        if (statusCode == 0) {
-            shellResult.setResult(sb.toString());
-        } else {
-            shellResult.setResult(sb1.toString());
-        }
-
-        if (channel.isConnected()) {
-            channel.disconnect();
-        }
-
-        return shellResult;
-    }
-
-    public List<RemoteHost> getRemoteHost(String filePath) {
-        List<String> list = textFile.read(filePath);
-        List<RemoteHost> remoteHosts = new ArrayList<>();
-        for (int i = 1; i < list.size(); i++) {
-            // 从 csv 文件的第二行开始读, 第一行为注释
-            String line = list.get(i);
-            String[] arr = line.split(",");
-            if (arr.length == 4) {
-                String host = arr[0];
-                int port = Integer.parseInt(arr[1]);
-                String username = arr[2];
-                String password = arr[3];
-                remoteHosts.add(new RemoteHost(host, port, username, password, null));
-            } else if (arr.length == 5) {
-                String host = arr[0];
-                int port = Integer.parseInt(arr[1]);
-                String username = arr[2];
-                String prikeyPath = arr[4];
-                remoteHosts.add(new RemoteHost(host, port, username, null, prikeyPath));
-            }
-        }
-
-        return remoteHosts;
-    }
-
-    public void deploy(String localDir, String remoteDir, RemoteHost remoteHost) throws Exception {
-        Session session = getSession(remoteHost);
-        boolean exist = mkdir(session, remoteDir);
-        if (exist) {
-            String command = String.format("cd %s && %s shutdown.sh", remoteDir, bash);
-            ShellResult shellResult = exec(session, command);
-            if (!shellResult.isSuccess()) {
-                log.info("shutdown application failed\nexitCode: {}\nresult:\n{}", shellResult.getExitCode(), shellResult.getResult());
-                System.exit(-1);
-            } else {
-                log.info("shutdown application successfully");
-            }
-        } else {
-            log.info("remote dir {} created", remoteDir);
-        }
-
-        upload(session, localDir, remoteDir);
-        log.info("files uploaded");
-
-        String command = String.format("cd %s && %s start.sh", remoteDir, bash);
-        ShellResult shellResult = exec(session, command);
-        if (!shellResult.isSuccess()) {
-            log.info("start application failed\nexitCode: {}\nresult:\n{}", shellResult.getExitCode(), shellResult.getResult());
-            System.exit(-1);
-        } else {
-            log.info("start application successfully");
-        }
-
-        if (session.isConnected()) {
-            session.disconnect();
-        }
-    }
-
-    public void exec(RemoteHost remoteHost, String command) throws Exception {
-        Session session = getSession(remoteHost);
-        ShellResult shellResult = exec(session, command);
-        log.info("\nexitCode: {}\nresult:\n{}", shellResult.getExitCode(), shellResult.getResult());
-
-        if (session.isConnected()) {
-            session.disconnect();
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        String host = "192.168.0.180";
-        int port = 22;
-        String username = "root";
-        String password = "gsh";
-        RemoteHost remoteHost = new RemoteHost(host, port, username, password, null);
-
-        String command = "docker ps -a";
-        Sftp sftp = new Sftp();
-        sftp.exec(remoteHost, command);
-    }
-}

+ 0 - 3
deployer/src/main/resources/servers.csv

@@ -1,3 +0,0 @@
-host,port,username,password,prikeyPath
-192.168.0.110,22,root,aka,
-192.168.0.77,22,root,,~/.ssh/id_rsa_node77

+ 0 - 68
logstash/pom.xml

@@ -1,68 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>bnt</artifactId>
-        <groupId>cn.reghao.bnt</groupId>
-        <version>1.0.0</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>logstash</artifactId>
-
-    <properties>
-        <project.build.outputDir>${project.parent.basedir}/bin/logstash</project.build.outputDir>
-    </properties>
-
-    <dependencies>
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>okhttp</artifactId>
-            <version>4.10.0</version>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <finalName>devops-${project.artifactId}</finalName>
-        <resources>
-            <resource>
-                <directory>src/main/resources</directory>
-                <filtering>true</filtering>
-                <includes>
-                    <include>logback.xml</include>
-                </includes>
-            </resource>
-        </resources>
-
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>3.2.0</version>
-                <configuration>
-                    <archive>
-                        <manifest>
-                            <mainClass>cn.reghao.bnt.logstash.LogStashApp</mainClass>
-                        </manifest>
-                    </archive>
-                    <descriptorRefs>
-                        <descriptorRef>jar-with-dependencies</descriptorRef>
-                    </descriptorRefs>
-                    <!-- 不设置此属性则生成的 jar 包名字会带有 jar-with-dependencies -->
-                    <appendAssemblyId>false</appendAssemblyId>
-                    <outputDirectory>${project.build.outputDir}</outputDirectory>
-                </configuration>
-                <executions>
-                    <execution>
-                        <id>make-assembly</id>
-                        <phase>package</phase>
-                        <goals>
-                            <goal>single</goal>
-                        </goals>
-                    </execution>
-                </executions>
-            </plugin>
-        </plugins>
-    </build>
-</project>

+ 0 - 47
logstash/src/main/java/cn/reghao/bnt/logstash/LogStashApp.java

@@ -1,47 +0,0 @@
-package cn.reghao.bnt.logstash;
-
-import cn.reghao.bnt.logstash.config.ConfigFile;
-import cn.reghao.bnt.logstash.config.LogFile;
-import cn.reghao.bnt.logstash.config.LogstashConfig;
-import cn.reghao.bnt.logstash.service.FileReader;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.util.SingleInstance;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2023-11-08 10:34:51
- */
-@Slf4j
-public class LogStashApp {
-    static void shutdownGracefully() {
-        Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(), "main-shutdown-hook"));
-    }
-
-    static class ShutdownHook implements Runnable {
-        @Override
-        public void run() {
-            log.info("资源清理完成,结束 devops-logstash...");
-        }
-    }
-
-    public static void main(String[] args) {
-        if (args.length != 1) {
-            log.error("必须指定配置文件...");
-            return;
-        }
-
-        String configFilePath = ConfigFile.configFilePath(args[0], LogStashApp.class);
-        LogstashConfig logstashConfig = JsonConverter.jsonFileToObject(new File(configFilePath), LogstashConfig.class);
-        String wsUrl = logstashConfig.getWsUrl();
-        List<LogFile> logFiles = logstashConfig.getLogFiles();
-        FileReader fileReader = new FileReader(wsUrl, logFiles);
-        fileReader.start();
-
-        shutdownGracefully();
-        SingleInstance.onlyOne(60002);
-    }
-}

+ 0 - 53
logstash/src/main/java/cn/reghao/bnt/logstash/config/ConfigFile.java

@@ -1,53 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.net.URL;
-import java.net.URLDecoder;
-import java.nio.charset.StandardCharsets;
-
-/**
- * @author reghao
- * @date 2021-03-03 18:47:01
- */
-@Slf4j
-public class ConfigFile {
-    public static String configFilePath(String arg, Class<?> clazz) {
-        String configFilePath = null;
-        File configFile = new File(arg);
-        if (!configFile.exists()) {
-            if (arg.startsWith("./")) {
-                String filename = arg.replace(".", "")
-                        .replace("/", "");
-                configFilePath = runningHome(clazz) + "/" + filename;
-            } else if (!arg.contains("/")) {
-                configFilePath = runningHome(clazz) + "/" + arg;
-            } else {
-                log.error("相对路径的配置文件必须以 ./configFile 或 configFile 形式指定...");
-            }
-        } else {
-            // 绝对路径
-            configFilePath = arg;
-        }
-        return configFilePath;
-    }
-    
-    /**
-     * jar 文件运行目录
-     *
-     * @param
-     * @return
-     * @date 2021-03-03 下午6:33
-     */
-    public static String runningHome(Class<?> clazz) {
-        URL url = clazz.getProtectionDomain().getCodeSource().getLocation();
-        String jarFilePath = URLDecoder.decode(url.getPath(), StandardCharsets.UTF_8);
-        if (jarFilePath.endsWith(".jar")) {
-            jarFilePath = jarFilePath.substring(0, jarFilePath.lastIndexOf("/") + 1);
-        }
-
-        File file = new File(jarFilePath);
-        return file.getAbsolutePath();
-    }
-}

+ 0 - 29
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogFile.java

@@ -1,29 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.AllArgsConstructor;
-
-/**
- * @author reghao
- * @date 2022-05-20 18:28:21
- */
-@AllArgsConstructor
-public class LogFile {
-    private String domain;
-    private String filePath;
-
-    public void setDomain(String domain) {
-        this.domain = domain;
-    }
-
-    public String getDomain() {
-        return domain;
-    }
-
-    public void setFilePath(String filePath) {
-        this.filePath = filePath;
-    }
-
-    public String getFilePath() {
-        return filePath;
-    }
-}

+ 0 - 17
logstash/src/main/java/cn/reghao/bnt/logstash/config/LogstashConfig.java

@@ -1,17 +0,0 @@
-package cn.reghao.bnt.logstash.config;
-
-import lombok.Getter;
-import lombok.Setter;
-
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2023-11-08 10:44:58
- */
-@Setter
-@Getter
-public class LogstashConfig {
-    private String wsUrl;
-    private List<LogFile> logFiles;
-}

+ 0 - 41
logstash/src/main/java/cn/reghao/bnt/logstash/service/FileReader.java

@@ -1,41 +0,0 @@
-package cn.reghao.bnt.logstash.service;
-
-import cn.reghao.bnt.logstash.config.LogFile;
-import cn.reghao.bnt.logstash.ws.WsClient;
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
-
-import java.util.List;
-import java.util.concurrent.*;
-
-/**
- * @author reghao
- * @date 2022-05-20 16:23:34
- */
-public class FileReader {
-    private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("logstash");
-    private final ScheduledExecutorService scheduledThreadPool = ThreadPoolWrapper.scheduledThreadPool("logstash1", 5);
-    private final String wsUrl;
-    private final List<LogFile> logFiles;
-
-    public FileReader(String wsUrl, List<LogFile> logFiles) {
-        this.wsUrl = wsUrl;
-        this.logFiles = logFiles;
-    }
-
-    public void start() {
-        String params = String.format("app=%s&host=%s", "nginx", "api.reghao.cn");
-        String url = String.format("%s?%s", wsUrl, params);
-        WsClient wsClient = new WsClient(url);
-        wsClient.connect();
-        for (LogFile logFile: logFiles) {
-            String filePath = logFile.getFilePath();
-            try {
-                TailReader tailReader = new TailReader(filePath, wsClient);
-                threadPool.submit(tailReader);
-                //scheduledThreadPool.scheduleAtFixedRate(tailReader, 0, 1, TimeUnit.SECONDS);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        }
-    }
-}

+ 0 - 94
logstash/src/main/java/cn/reghao/bnt/logstash/service/TailReader.java

@@ -1,94 +0,0 @@
-package cn.reghao.bnt.logstash.service;
-
-import cn.reghao.bnt.logstash.ws.WsClient;
-import cn.reghao.jutil.jdk.io.TextFile;
-import cn.reghao.jutil.jdk.serializer.JsonConverter;
-import cn.reghao.jutil.jdk.web.log.NginxLog;
-import lombok.extern.slf4j.Slf4j;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.List;
-
-/**
- * @author reghao
- * @date 2022-05-20 11:41:08
- */
-@Slf4j
-public class TailReader implements Runnable {
-    private final String filePath;
-    private final RandomAccessFile raf;
-    private long pointer;
-    private final WsClient wsClient;
-    private TextFile textFile;
-    private String errorLogPath;
-
-    public TailReader(String filePath, WsClient wsClient) throws FileNotFoundException {
-        this.filePath = filePath;
-        this.raf  = new RandomAccessFile(filePath, "r");
-        this.pointer = 0;
-        this.wsClient = wsClient;
-        this.textFile = new TextFile();
-        this.errorLogPath = System.getProperty("user.dir") + "/error.log";
-    }
-
-    @Override
-    public void run() {
-        try {
-            File errLogFile = new File(errorLogPath);
-            if (!errLogFile.exists()) {
-                errLogFile.createNewFile();
-            }
-
-            //raf.seek(length);
-            while (!Thread.interrupted()) {
-                try {
-                    /*String line = raf.readLine();
-                    while (line == null) {
-                        Thread.sleep(3_000);
-                        line = raf.readLine();
-                    }
-
-                    while (line != null) {
-                        parseAndPersist(line);
-                        line = raf.readLine();
-                    }*/
-
-                    long length = raf.length();
-                    if (length > pointer) {
-                        raf.seek(pointer);
-                        String line = raf.readLine();
-                        while (line != null) {
-                            parseAndPersist(line, errLogFile);
-                            line = raf.readLine();
-                        }
-                        pointer = length;
-                    } else {
-                        log.info("已读取到 {} 文件末尾, 休眠 10s 后再尝试读取...", filePath);
-                        Thread.sleep(10_000);
-                    }
-                } catch (Exception e) {
-                    e.printStackTrace();
-                }
-            }
-        } catch (Exception e ) {
-            e.printStackTrace();
-        }
-    }
-
-    private void parseAndPersist(String line, File errLogFile) {
-        try {
-            NginxLog nginxLog = JsonConverter.jsonToObject(line, NginxLog.class);
-            wsClient.send("", nginxLog);
-        } catch (Exception e) {
-            e.printStackTrace();
-            try {
-                textFile.append(errLogFile.getAbsolutePath(), List.of(line));
-            } catch (IOException ex) {
-                ex.printStackTrace();
-            }
-        }
-    }
-}

+ 0 - 89
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WebSocketListenerImpl.java

@@ -1,89 +0,0 @@
-package cn.reghao.bnt.logstash.ws;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import lombok.extern.slf4j.Slf4j;
-import okhttp3.Response;
-import okhttp3.WebSocket;
-import okhttp3.WebSocketListener;
-import okio.ByteString;
-
-import java.io.EOFException;
-import java.net.ConnectException;
-import java.net.ProtocolException;
-
-/**
- * @author reghao
- * @date 2023-02-23 09:26:50
- */
-@Slf4j
-public class WebSocketListenerImpl extends WebSocketListener {
-    private final WsClient wsClient;
-
-    public WebSocketListenerImpl(WsClient wsClient) {
-        this.wsClient = wsClient;
-    }
-
-    @Override
-    public void onOpen(WebSocket webSocket, Response response) {
-        log.info("WebSocket 连接成功");
-        wsClient.setConnected(true);
-        wsClient.resetRetryCount();
-    }
-
-    @Override
-    public void onClosing(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
-        wsClient.setConnected(false);
-        if (wsClient.isRetry()) {
-            reconnect();
-        }
-    }
-
-    @Override
-    public void onClosed(WebSocket webSocket, int code, String reason) {
-        log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
-        wsClient.setConnected(false);
-    }
-
-    @Override
-    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
-        log.info("WebSocket 异常事件: {}", throwable.toString());
-        if (throwable instanceof ConnectException
-                || throwable instanceof EOFException
-                || throwable instanceof ProtocolException) {
-            wsClient.setConnected(false);
-            if (wsClient.isRetry()) {
-                reconnect();
-            }
-        } else {
-            throwable.printStackTrace();
-        }
-    }
-
-    private void reconnect() {
-        log.info("WebSocket 重连");
-        try {
-            if (wsClient.getRetryCount() > 10) {
-                log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
-                Thread.sleep(60_000);
-                wsClient.resetRetryCount();
-            } else {
-                log.info("休眠 10s 后再尝试重连");
-                Thread.sleep(10_000);
-            }
-        } catch (InterruptedException e) {
-            e.printStackTrace();
-        }
-        wsClient.retryCountIncr();
-        wsClient.connect();
-    }
-
-    @Override
-    public void onMessage(WebSocket webSocket, String text) {
-    }
-
-    @Override
-    public void onMessage(WebSocket webSocket, ByteString bytes) {
-        Object object = JdkSerializer.deserialize(bytes.toByteArray());
-    }
-}

+ 0 - 81
logstash/src/main/java/cn/reghao/bnt/logstash/ws/WsClient.java

@@ -1,81 +0,0 @@
-package cn.reghao.bnt.logstash.ws;
-
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
-import okhttp3.*;
-import okio.ByteString;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * @author reghao
- * @date 2023-02-23 09:26:50
- */
-public class WsClient {
-    private final String url;
-    private WebSocket webSocket;
-    private boolean connected;
-    private final WebSocketListener webSocketListener;
-    private boolean retry;
-    private int retryCount;
-
-    public WsClient(String url) {
-        this.url = url;
-        this.webSocketListener = new WebSocketListenerImpl(this);
-        this.retry = true;
-        this.retryCount = 0;
-    }
-
-    public void setRetry(boolean retry) {
-        this.retry = retry;
-    }
-
-    public boolean isRetry() {
-        return retry;
-    }
-
-    public void retryCountIncr() {
-        this.retryCount += 1;
-    }
-
-    public void resetRetryCount() {
-        this.retryCount = 0;
-    }
-
-    public int getRetryCount() {
-        return retryCount;
-    }
-
-    public void connect() {
-        Request request = new Request.Builder()
-                .url(url)
-                .header("Authorization", "Bearer ")
-                .build();
-
-        OkHttpClient okHttpClient = new OkHttpClient.Builder()
-                .connectTimeout(30, TimeUnit.SECONDS)
-                .readTimeout(30, TimeUnit.SECONDS)
-                .writeTimeout(30, TimeUnit.SECONDS)
-                .build();
-        this.webSocket = okHttpClient.newWebSocket(request, webSocketListener);
-    }
-
-    public void setConnected(boolean status) {
-        this.connected = status;
-    }
-
-    public boolean isConnected() {
-        return connected;
-    }
-
-    public void send(String dest, Object message) {
-        if (isConnected()) {
-            byte[] bytes = JdkSerializer.serialize(message);
-            webSocket.send(ByteString.of(bytes));
-        }
-    }
-
-    public void close() {
-        setRetry(false);
-        webSocket.close(1000, "Client Close Connection");
-    }
-}

+ 0 - 2
pom.xml

@@ -11,8 +11,6 @@
         <module>web</module>
         <module>common</module>
         <module>agent</module>
-        <module>deployer</module>
-        <module>logstash</module>
     </modules>
     <packaging>pom</packaging>