|
|
@@ -0,0 +1,89 @@
|
|
|
+package cn.reghao.devops.logstash.ws;
|
|
|
+
|
|
|
+import cn.reghao.jutil.jdk.serializer.JdkSerializer;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import okhttp3.Response;
|
|
|
+import okhttp3.WebSocket;
|
|
|
+import okhttp3.WebSocketListener;
|
|
|
+import okio.ByteString;
|
|
|
+
|
|
|
+import java.io.EOFException;
|
|
|
+import java.net.ConnectException;
|
|
|
+import java.net.ProtocolException;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author reghao
|
|
|
+ * @date 2023-02-23 09:26:50
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+public class WebSocketListenerImpl extends WebSocketListener {
|
|
|
+ private final WsClient wsClient;
|
|
|
+
|
|
|
+ public WebSocketListenerImpl(WsClient wsClient) {
|
|
|
+ this.wsClient = wsClient;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onOpen(WebSocket webSocket, Response response) {
|
|
|
+ log.info("WebSocket 连接成功");
|
|
|
+ wsClient.setConnected(true);
|
|
|
+ wsClient.resetRetryCount();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onClosing(WebSocket webSocket, int code, String reason) {
|
|
|
+ log.error("WebSocket 连接被动断开 -> {} - {}", code, reason);
|
|
|
+ wsClient.setConnected(false);
|
|
|
+ if (wsClient.isRetry()) {
|
|
|
+ reconnect();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onClosed(WebSocket webSocket, int code, String reason) {
|
|
|
+ log.error("WebSocket 连接主动断开 -> {} - {}", code, reason);
|
|
|
+ wsClient.setConnected(false);
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onFailure(WebSocket webSocket, Throwable throwable, Response response) {
|
|
|
+ log.info("WebSocket 异常事件: {}", throwable.toString());
|
|
|
+ if (throwable instanceof ConnectException
|
|
|
+ || throwable instanceof EOFException
|
|
|
+ || throwable instanceof ProtocolException) {
|
|
|
+ wsClient.setConnected(false);
|
|
|
+ if (wsClient.isRetry()) {
|
|
|
+ reconnect();
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ throwable.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void reconnect() {
|
|
|
+ log.info("WebSocket 重连");
|
|
|
+ try {
|
|
|
+ if (wsClient.getRetryCount() > 10) {
|
|
|
+ log.info("WebSocket 重连超过 10 次, 休眠 1 分钟后再尝试");
|
|
|
+ Thread.sleep(60_000);
|
|
|
+ wsClient.resetRetryCount();
|
|
|
+ } else {
|
|
|
+ log.info("休眠 10s 后再尝试重连");
|
|
|
+ Thread.sleep(10_000);
|
|
|
+ }
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ wsClient.retryCountIncr();
|
|
|
+ wsClient.connect();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(WebSocket webSocket, String text) {
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void onMessage(WebSocket webSocket, ByteString bytes) {
|
|
|
+ Object object = JdkSerializer.deserialize(bytes.toByteArray());
|
|
|
+ }
|
|
|
+}
|