|
|
@@ -1,140 +0,0 @@
|
|
|
-package cn.reghao.tnb.log.ws;
|
|
|
-
|
|
|
-import cn.reghao.jutil.jdk.converter.DateTimeConverter;
|
|
|
-import cn.reghao.jutil.web.log.AppLog;
|
|
|
-import cn.reghao.jutil.jdk.serializer.JdkSerializer;
|
|
|
-import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import okhttp3.Response;
|
|
|
-import okhttp3.WebSocket;
|
|
|
-import okhttp3.WebSocketListener;
|
|
|
-import okio.ByteString;
|
|
|
-
|
|
|
-import java.io.EOFException;
|
|
|
-import java.net.ConnectException;
|
|
|
-import java.net.ProtocolException;
|
|
|
-import java.util.concurrent.Future;
|
|
|
-import java.util.concurrent.ScheduledExecutorService;
|
|
|
-import java.util.concurrent.TimeUnit;
|
|
|
-
|
|
|
-/**
|
|
|
- * @author reghao
|
|
|
- * @date 2023-02-23 09:26:50
|
|
|
- */
|
|
|
-@Slf4j
|
|
|
-public class WebSocketListenerImpl extends WebSocketListener {
|
|
|
- private final ScheduledExecutorService scheduler = ThreadPoolWrapper.scheduledThreadPool("ws-heartbeat", 1);
|
|
|
- private Future<?> heartbeatFuture;
|
|
|
- private final WsClient wsClient;
|
|
|
-
|
|
|
- public WebSocketListenerImpl(WsClient wsClient) {
|
|
|
- this.wsClient = wsClient;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onOpen(WebSocket webSocket, Response response) {
|
|
|
- log.info("WebSocket 连接成功");
|
|
|
- heartbeatFuture = scheduler.scheduleAtFixedRate(new HeartbeatTask(wsClient), 0, 10, TimeUnit.SECONDS);
|
|
|
-
|
|
|
- wsClient.setConnected(true);
|
|
|
- wsClient.resetRetryCount();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onClosing(WebSocket webSocket, int code, String reason) {
|
|
|
- log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
|
|
|
- if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
|
|
|
- heartbeatFuture.cancel(true);
|
|
|
- }
|
|
|
-
|
|
|
- wsClient.setConnected(false);
|
|
|
-
|
|
|
- if (wsClient.isRetry()) {
|
|
|
- reconnect();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onClosed(WebSocket webSocket, int code, String reason) {
|
|
|
- log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
|
|
|
- if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
|
|
|
- heartbeatFuture.cancel(true);
|
|
|
- }
|
|
|
-
|
|
|
- wsClient.setConnected(false);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
|
|
|
- log.info("WebSocket 异常事件: {}", throwable.toString());
|
|
|
- if (heartbeatFuture != null && !heartbeatFuture.isCancelled()) {
|
|
|
- heartbeatFuture.cancel(true);
|
|
|
- }
|
|
|
-
|
|
|
- if (throwable instanceof ConnectException
|
|
|
- || throwable instanceof EOFException
|
|
|
- || throwable instanceof ProtocolException) {
|
|
|
- wsClient.setConnected(false);
|
|
|
- if (wsClient.isRetry()) {
|
|
|
- reconnect();
|
|
|
- }
|
|
|
- } else {
|
|
|
- throwable.printStackTrace();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void reconnect() {
|
|
|
- log.info("WebSocket 重连");
|
|
|
- try {
|
|
|
- if (wsClient.getRetryCount() > 10) {
|
|
|
- log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
|
|
|
- Thread.sleep(60_000);
|
|
|
- wsClient.resetRetryCount();
|
|
|
- } else {
|
|
|
- log.info("休眠 10s 后再尝试重连");
|
|
|
- Thread.sleep(10_000);
|
|
|
- }
|
|
|
- } catch (InterruptedException e) {
|
|
|
- e.printStackTrace();
|
|
|
- }
|
|
|
- wsClient.retryCountIncr();
|
|
|
- wsClient.connect();
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(WebSocket webSocket, String text) {
|
|
|
- log.info("text message from server");
|
|
|
- log.info(text);
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void onMessage(WebSocket webSocket, ByteString bytes) {
|
|
|
- log.info("binary message from server");
|
|
|
- Object object = JdkSerializer.deserialize(bytes.toByteArray());
|
|
|
- if (object instanceof AppLog) {
|
|
|
- AppLog appLog = (AppLog) object;
|
|
|
- String app = appLog.getApp();
|
|
|
- String host = appLog.getHost();
|
|
|
- long timestamp = appLog.getTimestamp();
|
|
|
- String dateTime = DateTimeConverter.format(timestamp);
|
|
|
- String thread = appLog.getThread();
|
|
|
- String level = appLog.getLevel();
|
|
|
- String logger = appLog.getLogger();
|
|
|
- String message = appLog.getMessage();
|
|
|
- log.info("{} {} {} {} {} {} {}", app, host, dateTime, thread, level, logger, message);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- static class HeartbeatTask implements Runnable {
|
|
|
- private final WsClient wsClient;
|
|
|
-
|
|
|
- public HeartbeatTask(WsClient wsClient) {
|
|
|
- this.wsClient = wsClient;
|
|
|
- }
|
|
|
-
|
|
|
- @Override
|
|
|
- public void run() {
|
|
|
- wsClient.send("ws heartbeat");
|
|
|
- }
|
|
|
- }
|
|
|
-}
|