Forráskód Böngészése

添加处理来自 /ws/device 连接的 DeviceHandler

reghao 5 hónapja
szülő
commit
88d88f8dc8

+ 93 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/device/DeviceHandler.java

@@ -0,0 +1,93 @@
+package cn.reghao.tnb.message.app.ws.device;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.message.api.dto.MediaProgress;
+import cn.reghao.tnb.message.app.ws.device.msg.DeviceEvent;
+import cn.reghao.tnb.message.app.ws.device.msg.DeviceEventType;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+import org.springframework.web.socket.*;
+
+import java.io.IOException;
+
+/**
+ * @author reghao
+ * @date 2025-10-12 14:52:42
+ */
+@Slf4j
+@Component
+public class DeviceHandler implements WebSocketHandler {
+    private final DeviceService deviceService;
+
+    public DeviceHandler(DeviceService deviceService) {
+        this.deviceService = deviceService;
+    }
+
+    @Override
+    public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
+        boolean ret = deviceService.addSession(webSocketSession);
+        if (ret) {
+            log.info("WebSocket 建立连接");
+        } else {
+            log.info("没有认证信息, 关闭 WebSocket 连接");
+            webSocketSession.close(CloseStatus.NO_STATUS_CODE);
+        }
+    }
+
+    @Override
+    public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
+            throws IOException {
+        if (webSocketMessage instanceof TextMessage) {
+            dispatchTextMessage(webSocketSession, (TextMessage) webSocketMessage);
+        } else if (webSocketMessage instanceof BinaryMessage) {
+            log.info("接收到 WebSocket 二进制消息");
+        } else if (webSocketMessage instanceof PingMessage) {
+            log.info("接收到 WebSocket PingMessage");
+        } else if (webSocketMessage instanceof PongMessage) {
+            log.info("接收到 WebSocket PongMessage");
+        } else {
+            log.error("接收到未知类型的 WebSocket 消息");
+        }
+    }
+
+    @Override
+    public void handleTransportError(WebSocketSession webSocketSession, Throwable throwable) {
+        log.error("WebSocket 数据传输错误: {}", throwable.getMessage());
+        deviceService.removeSession(webSocketSession);
+    }
+
+    @Override
+    public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
+        log.info("WebSocket 断开连接: {} -> {}", closeStatus.getCode(), closeStatus.getReason());
+        deviceService.removeSession(webSocketSession);
+    }
+
+    @Override
+    public boolean supportsPartialMessages() {
+        return false;
+    }
+
+    private void dispatchTextMessage(WebSocketSession webSocketSession, TextMessage textMessage) {
+        String sessionId = webSocketSession.getId();
+        String payload = textMessage.getPayload();
+        try {
+            DeviceEvent deviceEvent = JsonConverter.jsonToObject(payload, DeviceEvent.class);
+            String type = deviceEvent.getType();
+            DeviceEventType deviceEventType = DeviceEventType.valueOf(type);
+            switch (deviceEventType) {
+                case heartbeat:
+                    deviceService.sendHeartbeatPong(webSocketSession);
+                    break;
+                case start_cam:
+                    String jsonStr = deviceEvent.getData().toString();
+                    MediaProgress mediaProgress = JsonConverter.jsonToObject(jsonStr, MediaProgress.class);
+                    break;
+                case shutdown_cam:
+                    break;
+                default:
+            }
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+    }
+}

+ 51 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/device/DeviceService.java

@@ -0,0 +1,51 @@
+package cn.reghao.tnb.message.app.ws.device;
+
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.message.api.model.EventType;
+import cn.reghao.tnb.message.api.model.resp.EventMessageResp;
+import cn.reghao.tnb.message.app.rabbit.RabbitProducer;
+import cn.reghao.tnb.message.app.redis.ds.RedisHash;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.web.socket.TextMessage;
+import org.springframework.web.socket.WebSocketSession;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+/**
+ * @author reghao
+ * @date 2025-10-12 14:53:07
+ */
+@Slf4j
+@Service
+public class DeviceService {
+    // sessionId -> deviceId
+    private final Map<String, String> sessionDevices = new HashMap<>();
+
+    public DeviceService(RabbitProducer rabbitProducer, RedisHash<HashSet<Long>> redisHash) {
+    }
+
+    public boolean addSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        String deviceId = (String) webSocketSession.getAttributes().get("deviceId");
+        sessionDevices.put(sessionId, deviceId);
+        log.info("device {} connected", deviceId);
+        return true;
+    }
+
+    public void removeSession(WebSocketSession webSocketSession) {
+        String sessionId = webSocketSession.getId();
+        String deviceId = sessionDevices.get(sessionId);
+        sessionDevices.remove(sessionId);
+        log.info("device {} connected", deviceId);
+    }
+
+    public void sendHeartbeatPong(WebSocketSession session) throws IOException {
+        EventMessageResp<String> resp = new EventMessageResp<>(EventType.heartbeat, "pong");
+        TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(resp));
+        session.sendMessage(textMessage);
+    }
+}

+ 16 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/device/msg/DeviceEvent.java

@@ -0,0 +1,16 @@
+package cn.reghao.tnb.message.app.ws.device.msg;
+
+import lombok.Getter;
+import lombok.Setter;
+
+/**
+ * @author reghao
+ * @date 2025-10-12 14:58:18
+ */
+@Setter
+@Getter
+public class DeviceEvent {
+    private String type;
+    private String direction;
+    private Object data;
+}

+ 9 - 0
message/message-service/src/main/java/cn/reghao/tnb/message/app/ws/device/msg/DeviceEventType.java

@@ -0,0 +1,9 @@
+package cn.reghao.tnb.message.app.ws.device.msg;
+
+/**
+ * @author reghao
+ * @date 2025-10-12 14:57:02
+ */
+public enum DeviceEventType {
+    heartbeat, start_cam, shutdown_cam
+}