Ver Fonte

update WsClient in common module

reghao há 2 anos atrás
pai
commit
0d5b4511d0

+ 6 - 0
common/pom.xml

@@ -63,5 +63,11 @@
             <artifactId>maven-invoker</artifactId>
             <version>3.0.1</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>4.10.0</version>
+        </dependency>
     </dependencies>
 </project>

+ 49 - 0
common/src/main/java/cn/reghao/devops/common/log/WsAppender.java

@@ -0,0 +1,49 @@
+package cn.reghao.devops.common.log;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.UnsynchronizedAppenderBase;
+import cn.reghao.devops.common.ws.WsClient;
+import cn.reghao.jutil.jdk.result.AppLog;
+
+/**
+ * @author reghao
+ * @date 2023-06-03 19:37:21
+ */
+public class WsAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
+    private final String app;
+    private final String host;
+    private final WsClient wsClient;
+
+    public WsAppender(String url, String app, String host) {
+        this.app = app;
+        this.host = host;
+        this.wsClient = new WsClient(url);
+        setName("wsAppender");
+    }
+
+    @Override
+    public void start() {
+        wsClient.connect();
+        super.start();
+    }
+
+    @Override
+    public void stop() {
+        super.stop();
+    }
+
+    @Override
+    protected void append(ILoggingEvent event) {
+        AppLog appLog = getAppLog(event);
+        wsClient.send(appLog);
+    }
+
+    private AppLog getAppLog(ILoggingEvent event) {
+        long timestamp = event.getTimeStamp();
+        String level = event.getLevel().toString();
+        String thread = event.getThreadName();
+        String logger = event.getLoggerName();
+        String message = event.getFormattedMessage();
+        return new AppLog(app, host, timestamp, level, thread, logger, message);
+    }
+}

+ 18 - 0
common/src/main/java/cn/reghao/devops/common/ws/MessageDispatcher.java

@@ -0,0 +1,18 @@
+package cn.reghao.devops.common.ws;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @author reghao
+ * @date 2023-11-01 14:34:19
+ */
+@Slf4j
+public class MessageDispatcher {
+    public void dispatch(String text) {
+        log.info("text message from server");
+    }
+
+    public void dispatch(Object object) {
+        log.info("binary message from server");
+    }
+}

+ 93 - 0
common/src/main/java/cn/reghao/devops/common/ws/WebSocketListenerImpl.java

@@ -0,0 +1,93 @@
+package cn.reghao.devops.common.ws;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Response;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okio.ByteString;
+
+import java.io.EOFException;
+import java.net.ConnectException;
+import java.net.ProtocolException;
+
+/**
+ * @author reghao
+ * @date 2023-02-23 09:26:50
+ */
+@Slf4j
+public class WebSocketListenerImpl extends WebSocketListener {
+    private final WsClient wsClient;
+    private MessageDispatcher dispatcher;
+
+    public WebSocketListenerImpl(WsClient wsClient) {
+        this.wsClient = wsClient;
+    }
+
+    @Override
+    public void onOpen(WebSocket webSocket, Response response) {
+        log.info("WebSocket 连接成功");
+        wsClient.setConnected(true);
+        wsClient.resetRetryCount();
+    }
+
+    @Override
+    public void onClosing(WebSocket webSocket, int code, String reason) {
+        log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
+        wsClient.setConnected(false);
+
+        if (wsClient.isRetry()) {
+            reconnect();
+        }
+    }
+
+    @Override
+    public void onClosed(WebSocket webSocket, int code, String reason) {
+        log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
+        wsClient.setConnected(false);
+    }
+
+    @Override
+    public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
+        log.info("WebSocket 异常事件: {}", throwable.toString());
+        if (throwable instanceof ConnectException
+                || throwable instanceof EOFException
+                || throwable instanceof ProtocolException) {
+            wsClient.setConnected(false);
+            if (wsClient.isRetry()) {
+                reconnect();
+            }
+        } else {
+            throwable.printStackTrace();
+        }
+    }
+
+    private void reconnect() {
+        log.info("WebSocket 重连");
+        try {
+            if (wsClient.getRetryCount() > 10) {
+                log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
+                Thread.sleep(60_000);
+                wsClient.resetRetryCount();
+            } else {
+                log.info("休眠 10s 后再尝试重连");
+                Thread.sleep(10_000);
+            }
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+        wsClient.retryCountIncr();
+        wsClient.connect();
+    }
+
+    @Override
+    public void onMessage(WebSocket webSocket, String text) {
+        dispatcher.dispatch(text);
+    }
+
+    @Override
+    public void onMessage(WebSocket webSocket, ByteString bytes) {
+        Object object = JdkSerializer.deserialize(bytes.toByteArray());
+        dispatcher.dispatch(object);
+    }
+}

+ 106 - 0
common/src/main/java/cn/reghao/devops/common/ws/WsClient.java

@@ -0,0 +1,106 @@
+package cn.reghao.devops.common.ws;
+
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
+import okhttp3.OkHttpClient;
+import okhttp3.Request;
+import okhttp3.WebSocket;
+import okhttp3.WebSocketListener;
+import okio.ByteString;
+
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author reghao
+ * @date 2023-02-23 09:26:50
+ */
+public class WsClient {
+    private final ScheduledExecutorService scheduler = ThreadPoolWrapper.scheduledThreadPool("ws-heartbeat", 1);
+    private Future<?> heartbeatFuture;
+
+    private final String url;
+    private String token;
+    private WebSocket webSocket;
+    private boolean connected;
+    private final WebSocketListener webSocketListener;
+    private boolean retry;
+    private int retryCount;
+
+    public WsClient(String url) {
+        this.url = url;
+        //this.token = token;
+        this.webSocketListener = new WebSocketListenerImpl(this);
+        this.retry = true;
+        this.retryCount = 0;
+    }
+
+    public void setRetry(boolean retry) {
+        this.retry = retry;
+    }
+
+    public boolean isRetry() {
+        return retry;
+    }
+
+    public void retryCountIncr() {
+        this.retryCount += 1;
+    }
+
+    public void resetRetryCount() {
+        this.retryCount = 0;
+    }
+
+    public int getRetryCount() {
+        return retryCount;
+    }
+
+    public void connect() {
+        Request request = new Request.Builder()
+                .url(url)
+                //.header("Authorization", "Bearer " + token)
+                .build();
+
+        OkHttpClient okHttpClient = new OkHttpClient.Builder()
+                .connectTimeout(30, TimeUnit.SECONDS)
+                .readTimeout(30, TimeUnit.SECONDS)
+                .writeTimeout(30, TimeUnit.SECONDS)
+                .build();
+        this.webSocket = okHttpClient.newWebSocket(request, webSocketListener);
+    }
+
+    public void setConnected(boolean status) {
+        this.connected = status;
+        if (status) {
+            heartbeatFuture = scheduler.scheduleAtFixedRate(new HeartbeatTask(), 0, 10, TimeUnit.SECONDS);
+        } else {
+            if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
+                heartbeatFuture.cancel(true);
+            }
+        }
+    }
+
+    public boolean isConnected() {
+        return connected;
+    }
+
+    public void send(Object message) {
+        if (isConnected()) {
+            byte[] bytes = JdkSerializer.serialize(message);
+            webSocket.send(ByteString.of(bytes));
+        }
+    }
+
+    public void close() {
+        setRetry(false);
+        webSocket.close(1000, "Client Close Connection");
+    }
+
+    class HeartbeatTask implements Runnable {
+        @Override
+        public void run() {
+            send("");
+        }
+    }
+}