|
|
@@ -7,11 +7,15 @@ import cn.reghao.bnt.web.ws.EventCenter;
|
|
|
import cn.reghao.bnt.web.ws.WsSender;
|
|
|
import cn.reghao.jutil.jdk.event.message.EventMessage;
|
|
|
import cn.reghao.jutil.jdk.serializer.JdkSerializer;
|
|
|
+import cn.reghao.jutil.jdk.web.log.NginxLog;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.web.socket.*;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.nio.ByteBuffer;
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
|
|
|
/**
|
|
|
* @author reghao
|
|
|
@@ -30,16 +34,35 @@ public class AgentWebSocketHandler implements WebSocketHandler {
|
|
|
this.machineService = machineService;
|
|
|
}
|
|
|
|
|
|
- @Override
|
|
|
- public void afterConnectionEstablished(WebSocketSession webSocketSession) {
|
|
|
- String machineId = getId(webSocketSession);
|
|
|
- log.info("节点 {} 建立连接", machineId);
|
|
|
- wsSender.addSession(machineId, webSocketSession);
|
|
|
- machineService.setAgentStatus(machineId, NodeStatus.Online);
|
|
|
+ private Map<String, String> parseParams(String query) {
|
|
|
+ String[] params = query.split("&");
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ for (String param : params) {
|
|
|
+ String[] arr = param.split("=");
|
|
|
+ map.put(arr[0], arr[1]);
|
|
|
+ }
|
|
|
+ return map;
|
|
|
}
|
|
|
|
|
|
- private String getId(WebSocketSession webSocketSession) {
|
|
|
- return (String) webSocketSession.getAttributes().get(Keys.USER_UUID);
|
|
|
+ private String getMachineId(String query) {
|
|
|
+ Map<String, String> map = parseParams(query);
|
|
|
+ //String machineId = map.get("machineId");
|
|
|
+ String machineId = map.get("token");
|
|
|
+ return machineId;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void afterConnectionEstablished(WebSocketSession webSocketSession) throws IOException {
|
|
|
+ String query = webSocketSession.getUri().getQuery();
|
|
|
+ String machineId = getMachineId(query);
|
|
|
+ if (machineId != null) {
|
|
|
+ log.info("节点 {} 建立连接", machineId);
|
|
|
+ wsSender.addSession(machineId, webSocketSession);
|
|
|
+ machineService.setAgentStatus(machineId, NodeStatus.Online);
|
|
|
+ } else {
|
|
|
+ log.info("MachineId not exist");
|
|
|
+ webSocketSession.close();
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -59,6 +82,8 @@ public class AgentWebSocketHandler implements WebSocketHandler {
|
|
|
EventMessage eventMessage = (EventMessage) obj;
|
|
|
eventCenter.dispatch(eventMessage);
|
|
|
}
|
|
|
+ } else if (webSocketMessage instanceof NginxLog) {
|
|
|
+ log.info("收到来自 {} 的 NginxLog 消息", webSocketSession.getId());
|
|
|
} else if (webSocketMessage instanceof PingMessage) {
|
|
|
log.info("收到来自 {} 的 PingMessage 消息", webSocketSession.getId());
|
|
|
webSocketSession.sendMessage(new PongMessage());
|
|
|
@@ -79,10 +104,12 @@ public class AgentWebSocketHandler implements WebSocketHandler {
|
|
|
|
|
|
@Override
|
|
|
public void afterConnectionClosed(WebSocketSession webSocketSession, CloseStatus closeStatus) {
|
|
|
- String machineId = getId(webSocketSession);
|
|
|
- machineService.setAgentStatus(machineId, NodeStatus.Offline);
|
|
|
- wsSender.removeSession(machineId);
|
|
|
- log.info("节点 {} 断开连接", machineId);
|
|
|
+ String machineId = getMachineId(webSocketSession.getUri().getQuery());
|
|
|
+ if (machineId != null) {
|
|
|
+ machineService.setAgentStatus(machineId, NodeStatus.Offline);
|
|
|
+ wsSender.removeSession(machineId);
|
|
|
+ log.info("节点 {} 断开连接", machineId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
@Override
|