|
|
@@ -0,0 +1,384 @@
|
|
|
+package cn.reghao.tnb.search.app.soa.service;
|
|
|
+
|
|
|
+import cn.reghao.jutil.jdk.serializer.JsonConverter;
|
|
|
+import cn.reghao.tnb.search.app.soa.model.po.DubboSrv;
|
|
|
+import cn.reghao.tnb.search.app.soa.model.po.SpringCloudService;
|
|
|
+import cn.reghao.tnb.search.app.soa.model.vo.ServiceInfo;
|
|
|
+import com.google.gson.JsonNull;
|
|
|
+import com.google.gson.JsonObject;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.curator.framework.CuratorFramework;
|
|
|
+import org.apache.curator.framework.CuratorFrameworkFactory;
|
|
|
+import org.apache.curator.framework.recipes.cache.ChildData;
|
|
|
+import org.apache.curator.framework.recipes.cache.TreeCache;
|
|
|
+import org.apache.curator.framework.recipes.cache.TreeCacheEvent;
|
|
|
+import org.apache.curator.framework.recipes.cache.TreeCacheListener;
|
|
|
+import org.apache.curator.retry.ExponentialBackoffRetry;
|
|
|
+import org.springframework.core.env.Environment;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.lang.reflect.Field;
|
|
|
+import java.net.URI;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.*;
|
|
|
+import java.util.regex.Matcher;
|
|
|
+import java.util.regex.Pattern;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author reghao
|
|
|
+ * @date 2024-11-11 17:17:41
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class TnbZkService {
|
|
|
+ private final Pattern pattern = Pattern.compile(".*://");
|
|
|
+ private final String dubbo = "/dubbo";
|
|
|
+ private final Map<String, DubboSrv> dubboSrvMap = new HashMap<>();
|
|
|
+
|
|
|
+ private final String springcloud = "/services";
|
|
|
+ private final Map<String, SpringCloudService> springCloudMap = new HashMap<>();
|
|
|
+
|
|
|
+ private final String connectString;
|
|
|
+ private final CuratorFramework client;
|
|
|
+
|
|
|
+ public TnbZkService(Environment environment) {
|
|
|
+ String address = environment.getProperty("dubbo.registry.address");
|
|
|
+ this.connectString = address.replace("zookeeper://", "");
|
|
|
+ client = getZkClient();
|
|
|
+
|
|
|
+ listenNode(dubbo, "dubbo");
|
|
|
+ listenNode(springcloud, "springcloud");
|
|
|
+ }
|
|
|
+
|
|
|
+ public CuratorFramework getZkClient() {
|
|
|
+ CuratorFramework client = CuratorFrameworkFactory.builder()
|
|
|
+ .connectString(connectString)
|
|
|
+ .sessionTimeoutMs(5000)
|
|
|
+ .connectionTimeoutMs(3000)
|
|
|
+ .retryPolicy(new ExponentialBackoffRetry(1000, 3))
|
|
|
+ .build();
|
|
|
+ client.start();
|
|
|
+ return client;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<String> getZkService(String zkPath) throws Exception {
|
|
|
+ List<String> list = new ArrayList<>();
|
|
|
+ List<String> list1 = client.getChildren().forPath(zkPath);
|
|
|
+ if (list1.isEmpty()) {
|
|
|
+ byte[] zkData = client.getData().forPath(zkPath);
|
|
|
+ String zkDataStr = new String(zkData);
|
|
|
+ System.out.printf("%s -> %s", zkPath, zkDataStr);
|
|
|
+ } else {
|
|
|
+ return list1;
|
|
|
+ }
|
|
|
+
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<ServiceInfo> getDubboService() throws Exception {
|
|
|
+ List<ServiceInfo> list = new ArrayList<>();
|
|
|
+ //List<DubboSrv> dubboSrvList = getDubboServices();
|
|
|
+ List<DubboSrv> dubboSrvList = new ArrayList<>(dubboSrvMap.values());
|
|
|
+ Map<String, List<DubboSrv>> applicationMap = dubboSrvList.stream().collect(Collectors.groupingBy(DubboSrv::getApplication));
|
|
|
+ applicationMap.forEach((app, appServices) -> {
|
|
|
+ Map<String, List<DubboSrv>> sideMap = appServices.stream().collect(Collectors.groupingBy(DubboSrv::getSide));
|
|
|
+ String roleStr = sideMap.keySet().toString().replace("[", "").replace("]", "");
|
|
|
+ ServiceInfo serviceInfo = new ServiceInfo(app, roleStr);
|
|
|
+
|
|
|
+ List<DubboSrv> providers = sideMap.get("provider");
|
|
|
+ if (providers != null) {
|
|
|
+ Map<String, List<DubboSrv>> providerHosts = providers.stream().collect(Collectors.groupingBy(DubboSrv::getHostPort));
|
|
|
+ Set<String> hostPorts = providerHosts.keySet();
|
|
|
+ String hostPortsStr = hostPorts.toString().replace("[", "").replace("]", "");
|
|
|
+ serviceInfo.setHostPorts(hostPortsStr);
|
|
|
+ serviceInfo.setTotal(hostPorts.size());
|
|
|
+ }
|
|
|
+ list.add(serviceInfo);
|
|
|
+ });
|
|
|
+
|
|
|
+ /*Map<String, List<DubboSrv>> map = dubboSrvList.stream().collect(Collectors.groupingBy(DubboSrv::getSide));
|
|
|
+ List<DubboSrv> providers = map.get("provider");
|
|
|
+ if (providers != null) {
|
|
|
+ Map<String, List<DubboSrv>> map1 = providers.stream()
|
|
|
+ .collect(Collectors.groupingBy(DubboSrv::getApplication));
|
|
|
+ map1.forEach((serviceName, serviceList) -> {
|
|
|
+ Map<String, List<DubboSrv>> map2 = serviceList.stream().collect(Collectors.groupingBy(DubboSrv::getHostPort));
|
|
|
+ map2.forEach((hostPort, serviceList1) -> {
|
|
|
+ System.out.println();
|
|
|
+ });
|
|
|
+ });
|
|
|
+ }*/
|
|
|
+
|
|
|
+ return list;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, List<DubboSrv>> getDubboServiceMap() throws Exception {
|
|
|
+ List<DubboSrv> dubboSrvList = new ArrayList<>(dubboSrvMap.values());
|
|
|
+ Map<String, List<DubboSrv>> serviceMap = dubboSrvList.stream()
|
|
|
+ .collect(Collectors.groupingBy(DubboSrv::getApplication));
|
|
|
+ return serviceMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, List<DubboSrv>> getDubboServiceMap1() throws Exception {
|
|
|
+ List<String> list = client.getChildren().forPath(dubbo);
|
|
|
+ List<String> serviceNames = list.stream()
|
|
|
+ .filter(name -> !name.equals("metadata") && !name.equals("config"))
|
|
|
+ .collect(Collectors.toList());
|
|
|
+
|
|
|
+ List<DubboSrv> dubboSrvList = new ArrayList<>();
|
|
|
+ for (String serviceName : serviceNames) {
|
|
|
+ String path1 = String.format("%s/%s/providers", dubbo, serviceName);
|
|
|
+ List<String> providers = client.getChildren().forPath(path1);
|
|
|
+ List<String> dubboUrl = new ArrayList<>(providers);
|
|
|
+
|
|
|
+ String path2 = String.format("%s/%s/consumers", dubbo, serviceName);
|
|
|
+ List<String> consumers = client.getChildren().forPath(path2);
|
|
|
+ dubboUrl.addAll(consumers);
|
|
|
+
|
|
|
+ List<DubboSrv> dubboSrvs = dubboUrl.stream()
|
|
|
+ .map(this::getDubboSrv)
|
|
|
+ .filter(Objects::nonNull)
|
|
|
+ .collect(Collectors.toList());
|
|
|
+ dubboSrvList.addAll(dubboSrvs);
|
|
|
+ }
|
|
|
+
|
|
|
+ Map<String, List<DubboSrv>> serviceMap = dubboSrvList.stream().collect(Collectors.groupingBy(DubboSrv::getIface));
|
|
|
+ for (String serviceName : serviceNames) {
|
|
|
+ serviceMap.computeIfAbsent(serviceName, v -> new ArrayList<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ return serviceMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ private DubboSrv getDubboSrv(String zkPath) {
|
|
|
+ String url = URI.create(zkPath).getPath();
|
|
|
+ Matcher matcher = pattern.matcher(url);
|
|
|
+ if (matcher.find()) {
|
|
|
+ String protocolPrefix = matcher.group();
|
|
|
+ String url1 = url.replace(protocolPrefix, "");
|
|
|
+ String[] arr = url1.split("\\?");
|
|
|
+ int idx = arr[0].indexOf("/");
|
|
|
+ if (idx == -1) {
|
|
|
+ //throw new Exception("dubbo url illegal");
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ String hostPort = arr[0].substring(0, idx);
|
|
|
+ String clazzName = arr[0].substring(idx+1);
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ for (String kv : arr[1].split("&")) {
|
|
|
+ String[] kvs = kv.split("=");
|
|
|
+ map.put(kvs[0], kvs[1]);
|
|
|
+ }
|
|
|
+
|
|
|
+ Class<?> clazz = DubboSrv.class;
|
|
|
+ Field[] fields = clazz.getDeclaredFields();
|
|
|
+ try {
|
|
|
+ Object object = clazz.getDeclaredConstructor().newInstance();
|
|
|
+ for (Field field : fields) {
|
|
|
+ String name = field.getName();
|
|
|
+ String value = map.get(name);
|
|
|
+ if (value == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Class<?> clazzType = field.getType();
|
|
|
+ field.setAccessible(true);
|
|
|
+ if (clazzType.equals(String.class)) {
|
|
|
+ field.set(object, value);
|
|
|
+ } else if (clazzType.equals(Long.class)) {
|
|
|
+ field.set(object, Long.parseLong(value));
|
|
|
+ } else if (clazzType.equals(Integer.class)) {
|
|
|
+ field.set(object, Integer.parseInt(value));
|
|
|
+ } else if (clazzType.equals(Boolean.class)){
|
|
|
+ field.set(object, Boolean.valueOf(value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ DubboSrv dubboSrv = (DubboSrv) object;
|
|
|
+ dubboSrv.setIface(map.get("interface"));
|
|
|
+ dubboSrv.setHostPort(hostPort);
|
|
|
+ return dubboSrv;
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ public List<ServiceInfo> getCloudService() throws Exception {
|
|
|
+ Map<String, List<SpringCloudService>> listMap = getSpringCloudServices();
|
|
|
+ List<ServiceInfo> serviceInfos = new ArrayList<>();
|
|
|
+ listMap.forEach((app, appList) -> {
|
|
|
+ List<String> hostPorts = appList.stream().map(springCloudService -> {
|
|
|
+ String address = springCloudService.getAddress();
|
|
|
+ int port = springCloudService.getPort();
|
|
|
+ return String.format("%s:%s", address, port);
|
|
|
+ }).collect(Collectors.toList());
|
|
|
+ String hostPortsStr = hostPorts.toString().replace("[", "").replace("]", "");
|
|
|
+ serviceInfos.add(new ServiceInfo(app, appList.size(), hostPortsStr));
|
|
|
+ });
|
|
|
+ return serviceInfos;
|
|
|
+ }
|
|
|
+
|
|
|
+ public Map<String, List<SpringCloudService>> getSpringCloudServices() throws Exception {
|
|
|
+ //List<SpringCloudService> serviceList = new ArrayList<>();
|
|
|
+ List<SpringCloudService> serviceList = new ArrayList<>(springCloudMap.values());
|
|
|
+ List<String> list = client.getChildren().forPath(springcloud);
|
|
|
+ /*for (String service : list) {
|
|
|
+ List<String> list1 = client.getChildren().forPath(springcloud + "/" + service);
|
|
|
+ for (String serviceId : list1) {
|
|
|
+ String dataPath = springcloud + "/" + service + "/" + serviceId;
|
|
|
+ byte[] bytes = client.getData().forPath(dataPath);
|
|
|
+ SpringCloudService springCloudService = getSpringCloudService(bytes);
|
|
|
+ serviceList.add(springCloudService);
|
|
|
+ }
|
|
|
+ }*/
|
|
|
+
|
|
|
+ Map<String, List<SpringCloudService>> listMap = serviceList.stream()
|
|
|
+ .collect(Collectors.groupingBy(SpringCloudService::getName));
|
|
|
+ for (String serviceName : list) {
|
|
|
+ listMap.computeIfAbsent(serviceName, v -> new ArrayList<>());
|
|
|
+ }
|
|
|
+
|
|
|
+ return listMap;
|
|
|
+ }
|
|
|
+
|
|
|
+ private SpringCloudService getSpringCloudService(byte[] zkData) throws Exception {
|
|
|
+ String json = new String(zkData);
|
|
|
+ JsonObject jsonObject = JsonConverter.jsonToJsonElement(json).getAsJsonObject();
|
|
|
+ Map<String, String> map = new HashMap<>();
|
|
|
+ jsonObject.entrySet().forEach(entry -> {
|
|
|
+ String key = entry.getKey();
|
|
|
+ if (key.equals("payload") || key.equals("uriSpec")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!entry.getValue().equals(JsonNull.INSTANCE)) {
|
|
|
+ String value = entry.getValue().getAsString();
|
|
|
+ map.put(key, value);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ Class<?> clazz = SpringCloudService.class;
|
|
|
+ Field[] fields = clazz.getDeclaredFields();
|
|
|
+ Object object = clazz.getDeclaredConstructor().newInstance();
|
|
|
+ for (Field field : fields) {
|
|
|
+ String name = field.getName();
|
|
|
+ String value = map.get(name);
|
|
|
+ if (value == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ Class<?> clazzType = field.getType();
|
|
|
+ field.setAccessible(true);
|
|
|
+ if (clazzType.equals(String.class)) {
|
|
|
+ field.set(object, value);
|
|
|
+ } else if (clazzType.equals(Long.class)) {
|
|
|
+ field.set(object, Long.parseLong(value));
|
|
|
+ } else if (clazzType.equals(Integer.class)) {
|
|
|
+ field.set(object, Integer.parseInt(value));
|
|
|
+ } else if (clazzType.equals(Boolean.class)){
|
|
|
+ field.set(object, Boolean.valueOf(value));
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return (SpringCloudService) object;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void listenNode(String zkPath, String serviceType) {
|
|
|
+ try {
|
|
|
+ TreeCache treeCache = new TreeCache(client, zkPath);
|
|
|
+ TreeCacheListener listener = new TreeCacheListener() {
|
|
|
+ @Override
|
|
|
+ public void childEvent(CuratorFramework client, TreeCacheEvent event) {
|
|
|
+ ChildData data = event.getData();
|
|
|
+ if (data == null) {
|
|
|
+ log.info("数据为 null");
|
|
|
+ return;
|
|
|
+ } else if (data.getData().length == 0) {
|
|
|
+ log.info("数据为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ String dataPath = data.getPath();
|
|
|
+ String url = URI.create(dataPath).getPath();
|
|
|
+ if (serviceType.equals("dubbo")) {
|
|
|
+ if (!url.contains("?")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ if (dataPath.startsWith("/dubbo/config") || dataPath.startsWith("/dubbo/metadata")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (dataPath.endsWith("/routers")
|
|
|
+ || dataPath.startsWith("/providers")
|
|
|
+ || dataPath.startsWith("/consumers")) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ String dataContent = new String(data.getData(), StandardCharsets.UTF_8);
|
|
|
+ try {
|
|
|
+ switch (event.getType()) {
|
|
|
+ case NODE_ADDED:
|
|
|
+ log.info("[{}-TreeCache]节点增加", serviceType);
|
|
|
+ if (serviceType.equals("springcloud")) {
|
|
|
+ springCloudMap.put(dataPath, getSpringCloudService(data.getData()));
|
|
|
+ } else if (serviceType.equals("dubbo")) {
|
|
|
+ dubboSrvMap.put(dataPath, getDubboSrv(dataPath));
|
|
|
+ } else {
|
|
|
+ log.error("serviceType {} unknown", serviceType);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case NODE_UPDATED:
|
|
|
+ //log.info("[{}-TreeCache]节点更新, path={}, data={}", serviceType, dataPath, dataContent);
|
|
|
+ if (serviceType.equals("springcloud")) {
|
|
|
+ springCloudMap.put(dataPath, getSpringCloudService(data.getData()));
|
|
|
+ } else if (serviceType.equals("dubbo")) {
|
|
|
+ dubboSrvMap.put(dataPath, getDubboSrv(dataPath));
|
|
|
+ } else {
|
|
|
+ log.error("serviceType {} unknown", serviceType);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case NODE_REMOVED:
|
|
|
+ log.info("[{}-TreeCache]节点删除", serviceType);
|
|
|
+ if (serviceType.equals("springcloud")) {
|
|
|
+ springCloudMap.remove(dataPath);
|
|
|
+ } else if (serviceType.equals("dubbo")) {
|
|
|
+ dubboSrvMap.remove(dataPath);
|
|
|
+ } else {
|
|
|
+ log.error("serviceType {} unknown", serviceType);
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ treeCache.getListenable().addListener(listener);
|
|
|
+ treeCache.start();
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("PathCache 监听失败, path={}", zkPath);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public void deleteNode(String appName) {
|
|
|
+ try {
|
|
|
+ Map<String, List<SpringCloudService>> listMap = getSpringCloudServices();
|
|
|
+ List<SpringCloudService> cloudServices = listMap.get(appName);
|
|
|
+ cloudServices.forEach(cloudService -> {
|
|
|
+ String zkPath = "";
|
|
|
+ //client.delete().forPath(zkPath);
|
|
|
+ });
|
|
|
+ } catch (Exception e) {
|
|
|
+ e.printStackTrace();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|