|
|
@@ -1,17 +1,24 @@
|
|
|
package cn.reghao.devops.manager.ws.handler;
|
|
|
|
|
|
+import cn.reghao.devops.model.NginxLog;
|
|
|
import cn.reghao.devops.manager.log.service.LogService;
|
|
|
import cn.reghao.jutil.jdk.converter.DateTimeConverter;
|
|
|
import cn.reghao.jutil.jdk.result.AppLog;
|
|
|
import cn.reghao.jutil.jdk.serializer.JdkSerializer;
|
|
|
import cn.reghao.jutil.jdk.serializer.JsonConverter;
|
|
|
+import cn.reghao.jutil.jdk.thread.ThreadPoolWrapper;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
import org.springframework.web.socket.*;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.ZoneId;
|
|
|
+import java.time.ZoneOffset;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.concurrent.ConcurrentHashMap;
|
|
|
+import java.util.concurrent.ExecutorService;
|
|
|
|
|
|
/**
|
|
|
* @author reghao
|
|
|
@@ -20,11 +27,13 @@ import java.util.concurrent.ConcurrentHashMap;
|
|
|
@Slf4j
|
|
|
@Component
|
|
|
public class LogHandler implements WebSocketHandler {
|
|
|
- private LogService logService;
|
|
|
+ private final ExecutorService threadPool = ThreadPoolWrapper.threadPool("push-task", 1);
|
|
|
+ private final LogService logService;
|
|
|
private final Map<String, WebSocketSession> pullSessions = new ConcurrentHashMap<>();
|
|
|
|
|
|
public LogHandler(LogService logService) {
|
|
|
this.logService = logService;
|
|
|
+ threadPool.submit(new PushTask());
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
@@ -37,7 +46,7 @@ public class LogHandler implements WebSocketHandler {
|
|
|
String host = map.get("host");
|
|
|
String key = String.format("%s-%s", app, host);
|
|
|
//pushSessions.put(key, webSocketSession);
|
|
|
- } else {
|
|
|
+ } else if (path.equals("/ws/log/pull")) {
|
|
|
String query = webSocketSession.getUri().getQuery();
|
|
|
Map<String, String> map = parseParams(query);
|
|
|
String app = map.get("app");
|
|
|
@@ -60,6 +69,7 @@ public class LogHandler implements WebSocketHandler {
|
|
|
return map;
|
|
|
}
|
|
|
|
|
|
+ Map<Long, Integer> map = new TreeMap<>();
|
|
|
@Override
|
|
|
public void handleMessage(WebSocketSession webSocketSession, WebSocketMessage<?> webSocketMessage)
|
|
|
throws IOException {
|
|
|
@@ -87,6 +97,21 @@ public class LogHandler implements WebSocketHandler {
|
|
|
e.printStackTrace();
|
|
|
}
|
|
|
}
|
|
|
+ } else if (object instanceof NginxLog) {
|
|
|
+ NginxLog nginxLog = (NginxLog) object;
|
|
|
+ String date = nginxLog.getTimeIso8601();
|
|
|
+ DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss+08:00")
|
|
|
+ .withZone(ZoneId.of("UTC"));
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.parse(date, formatter);
|
|
|
+ Long timestamp = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
|
|
|
+ Long key = timestamp;
|
|
|
+ Integer count = map.get(key);
|
|
|
+ if (count == null) {
|
|
|
+ map.put(key, 1);
|
|
|
+ } else {
|
|
|
+ int count1 = map.get(key) + 1;
|
|
|
+ map.put(key, count1);
|
|
|
+ }
|
|
|
}
|
|
|
} else if (webSocketMessage instanceof PingMessage) {
|
|
|
log.info("接收到 WebSocket PingMessage");
|
|
|
@@ -145,4 +170,45 @@ public class LogHandler implements WebSocketHandler {
|
|
|
public boolean supportsPartialMessages() {
|
|
|
return false;
|
|
|
}
|
|
|
+
|
|
|
+ class PushTask implements Runnable {
|
|
|
+ @Override
|
|
|
+ public void run() {
|
|
|
+ while (!Thread.interrupted()) {
|
|
|
+ try {
|
|
|
+ if (map.size() < 3) {
|
|
|
+ Thread.sleep(10_000);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ LocalDateTime localDateTime = LocalDateTime.now();
|
|
|
+ Long baseKey = localDateTime.toEpochSecond(ZoneOffset.of("+8"));
|
|
|
+
|
|
|
+ List<String> xList = new ArrayList<>();
|
|
|
+ List<Integer> yList = new ArrayList<>();
|
|
|
+ Set<Long> keys = new HashSet<>();
|
|
|
+ for (Long key : map.keySet()) {
|
|
|
+ if (key < baseKey) {
|
|
|
+ xList.add(DateTimeConverter.format(key*1000).split(" ")[1]);
|
|
|
+ yList.add(map.get(key));
|
|
|
+ keys.add(key);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ keys.forEach(map::remove);
|
|
|
+ keys.clear();
|
|
|
+ List results = new ArrayList();
|
|
|
+ results.add(xList.toArray());
|
|
|
+ results.add(yList.toArray());
|
|
|
+
|
|
|
+ WebSocketSession webSocketSession = getPullSession("admin-service", "172.16.90.200");
|
|
|
+ TextMessage textMessage = new TextMessage(JsonConverter.objectToJson(results));
|
|
|
+ webSocketSession.sendMessage(textMessage);
|
|
|
+ Thread.sleep(10_000);
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|