Преглед изворни кода

update user-service hystrix

reghao пре 8 месеци
родитељ
комит
db11730374

+ 37 - 7
user/user-service/src/main/java/cn/reghao/tnb/user/app/controller/UserControllerHystrix.java

@@ -1,6 +1,10 @@
 package cn.reghao.tnb.user.app.controller;
 
 import cn.reghao.jutil.jdk.serializer.JsonConverter;
+import cn.reghao.tnb.user.api.dto.UserInfo;
+import cn.reghao.tnb.user.app.hystrix.SimpleHystrixCommand;
+import cn.reghao.tnb.user.app.hystrix.observable.HaloSemaphoreIsolationCommand;
+import cn.reghao.tnb.user.app.hystrix.observable.HaloThreadIsolationCommand;
 import cn.reghao.tnb.user.app.service.UserServiceHystrix;
 import cn.reghao.tnb.user.app.service.UserProfileService;
 import com.netflix.hystrix.contrib.javanica.annotation.DefaultProperties;
@@ -11,6 +15,7 @@ import io.swagger.v3.oas.annotations.Operation;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.MediaType;
 import org.springframework.web.bind.annotation.*;
+import rx.Observable;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -26,14 +31,40 @@ import java.util.Map;
 public class UserControllerHystrix {
     private final UserProfileService userProfileService;
     private final UserServiceHystrix userServiceHystrix;
+    private HaloSemaphoreIsolationCommand haloSemaphoreIsolationCommand;
+    private HaloThreadIsolationCommand haloThreadIsolationCommand;
 
-    public UserControllerHystrix(UserProfileService userProfileService, UserServiceHystrix userServiceHystrix) {
+    public UserControllerHystrix(UserProfileService userProfileService, UserServiceHystrix userServiceHystrix,
+                                 HaloSemaphoreIsolationCommand haloSemaphoreIsolationCommand,
+                                 HaloThreadIsolationCommand haloThreadIsolationCommand) {
         this.userProfileService = userProfileService;
         this.userServiceHystrix = userServiceHystrix;
+        this.haloSemaphoreIsolationCommand = haloSemaphoreIsolationCommand;
+        this.haloThreadIsolationCommand = haloThreadIsolationCommand;
     }
 
     @Operation(summary = "获取用户资料", description = "N")
     @GetMapping(value = "/info", produces = MediaType.APPLICATION_JSON_VALUE)
+    public String getUserInfo(@RequestParam("userId") Long userId) {
+        UserHolder.setUser("reghao");
+        userServiceHystrix.hello();
+        log.info("user -> {}", UserHolder.getUser());
+
+        /*Observable<String> observable = haloThreadIsolationCommand.toObservable();
+        String result = observable.toBlocking().single();*/
+
+        /*Observable<String> observable1 = haloSemaphoreIsolationCommand.toObservable();
+        String result1 = observable1.toBlocking().single();*/
+
+        /*SimpleHystrixCommand simpleHystrixCommand = new SimpleHystrixCommand(userServiceHystrix);
+        String result2 = simpleHystrixCommand.execute();*/
+
+        //UserInfo userInfo = userProfileService.getUserInfo(userId);
+        String userInfoJson = userServiceHystrix.getUserInfo(userId);
+        return userInfoJson;
+    }
+
+    @GetMapping(value = "/info1", produces = MediaType.APPLICATION_JSON_VALUE)
     @HystrixCommand(
             fallbackMethod = "getUserInfoFallback",
             commandProperties = {
@@ -45,21 +76,20 @@ public class UserControllerHystrix {
                     @HystrixProperty(name="circuitBreaker.errorThresholdPercentage", value="50"),
                     @HystrixProperty(name="circuitBreaker.sleepWindowInMilliseconds", value="3000")
             })
-    public String getUserInfo(@RequestParam("userId") Long userId) {
+    public UserInfo getUserInfo1(@RequestParam("userId") Long userId) {
         UserHolder.setUser("reghao");
         userServiceHystrix.hello();
         log.info("user -> {}", UserHolder.getUser());
-        //UserInfo userInfo = userProfileService.getUserInfo(userId);
-        String userInfoJson = userServiceHystrix.getUserInfo(userId);
-        return userInfoJson;
+        UserInfo userInfo = userServiceHystrix.getUserInfo1(userId);
+        return userInfo;
     }
 
-    public String getUserInfoFallback(Long userId) {
+    public UserInfo getUserInfoFallback(Long userId) {
         System.out.printf("hystrix fallback with userId %s\n", userId);
         Map<String, String> map = new HashMap<>();
         map.put("code", "0");
         map.put("msg", "hystrix fallback");
-        return JsonConverter.objectToJson(map);
+        return null;
     }
 
     public String defaultFallback() {

+ 70 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/hystrix/MyHystrixConcurrencyStrategy.java

@@ -0,0 +1,70 @@
+package cn.reghao.tnb.user.app.hystrix;
+
+import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
+import lombok.extern.slf4j.Slf4j;
+
+import java.lang.reflect.Field;
+import java.util.concurrent.Callable;
+
+/**
+ * 将当前线程的 threadLocal 变量传给 callable
+ * callable 在执行时会使用传入的调用者线程的 threadLocal 变量, 然后再执行任务
+ * callable 执行前后需要保存 worker 线程自身的线程局部变量以供恢复
+ *
+ * @author reghao
+ * @date 2025-07-22 16:04:38
+ */
+@Slf4j
+public class MyHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {
+    @Override
+    public <T> Callable<T> wrapCallable(Callable<T> callable) {
+        // 获取当前线程的 threadLocalMap
+        Object currentThreadLocalMap = getCurrentThreadLocalMap();
+
+        Callable<T> finalCallable = new Callable<T>() {
+            private Object callerThreadLocalMap = currentThreadLocalMap;
+            private Callable<T> targetCallable = callable;
+
+            @Override
+            public T call() throws Exception {
+                // 保存工作线程现有的线程变量
+                Object oldThreadLocalMapOfWorkThread = getCurrentThreadLocalMap();
+                // 将本线程的线程变量设置为 caller 的线程变量
+                setCurrentThreadLocalMap(callerThreadLocalMap);
+                try {
+                    return targetCallable.call();
+                } finally {
+                    setCurrentThreadLocalMap(oldThreadLocalMapOfWorkThread);
+                    log.info("restore work thread's threadlocal");
+                }
+            }
+        };
+
+        return finalCallable;
+    }
+
+    private Object getCurrentThreadLocalMap() {
+        Thread thread = Thread.currentThread();
+        try {
+            Field field = Thread.class.getDeclaredField("threadLocals");
+            field.setAccessible(true);
+            Object o = field.get(thread);
+            return o;
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            log.error("{}", e);
+        }
+        return null;
+    }
+
+    private void setCurrentThreadLocalMap(Object newThreadLocalMap) {
+        Thread thread = Thread.currentThread();
+        try {
+            Field field = Thread.class.getDeclaredField("threadLocals");
+            field.setAccessible(true);
+            field.set(thread,newThreadLocalMap);
+
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            log.error("{}", e);
+        }
+    }
+}

+ 78 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/hystrix/SimpleHystrixCommand.java

@@ -0,0 +1,78 @@
+package cn.reghao.tnb.user.app.hystrix;
+
+import cn.reghao.tnb.user.app.service.UserServiceHystrix;
+import com.netflix.hystrix.*;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Random;
+
+/**
+ * 普通的HystrixCommand,实现了fallback方法
+ *
+ * @author reghao
+ * @date 2025-07-22 16:03:20
+ */
+@Slf4j
+public class SimpleHystrixCommand extends HystrixCommand<String> {
+    private final UserServiceHystrix userServiceHystrix;
+
+    public SimpleHystrixCommand(UserServiceHystrix userServiceHystrix) {
+        super(setter());
+        this.userServiceHystrix = userServiceHystrix;
+    }
+
+    @Override
+    protected String run() throws Exception {
+        String s = userServiceHystrix.getResult();
+        log.info("get thread local:{}",s);
+
+        /**
+         * 如果睡眠时间,超过2s,会降级
+         * {@link #getFallback()}
+         */
+        int millis = new Random().nextInt(3000);
+        log.info("will sleep {} millis",millis);
+        Thread.sleep(millis);
+
+        return s;
+    }
+
+    private static Setter setter() {
+
+        // 服务分组
+        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("members");
+        // 服务标识
+        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("member");
+        // 线程池名称
+        HystrixThreadPoolKey threadPoolKey = HystrixThreadPoolKey.Factory.asKey("member-pool");
+
+        // 线程池配置 线程池大小为10,线程存活时间15秒 队列等待的阈值为100,超过100执行拒绝策略
+        HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter().withCoreSize(10)
+                .withKeepAliveTimeMinutes(15).withQueueSizeRejectionThreshold(100);
+
+        // 命令属性配置Hystrix 开启超时
+        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
+                // 采用线程池方式实现服务隔离
+                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
+                /**
+                 * 执行时间,2s,2s执行不完成,算超时,会降级
+                 */
+                .withExecutionTimeoutEnabled(true)
+                .withExecutionTimeoutInMilliseconds(2000);
+
+        return Setter.withGroupKey(groupKey).andCommandKey(commandKey).andThreadPoolKey(threadPoolKey)
+                .andThreadPoolPropertiesDefaults(threadPoolProperties).andCommandPropertiesDefaults(commandProperties);
+
+    }
+
+    /**
+     * 执行业务方法超时时,会进入本方法进行降级
+     * @return
+     */
+    @Override
+    protected String getFallback() {
+
+        log.error("fallback occur");
+        return "fallback occur";
+    }
+}

+ 47 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/hystrix/observable/HaloSemaphoreIsolationCommand.java

@@ -0,0 +1,47 @@
+package cn.reghao.tnb.user.app.hystrix.observable;
+
+import cn.reghao.tnb.user.app.service.UserServiceHystrix;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixObservableCommand;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * 使用HystrixObservableCommand时,默认会使用信号量隔离
+ * 甚至连线程池都不能控制了
+ *
+ * @author reghao
+ * @date 2025-07-23 17:54:37
+ */
+@Component
+@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class HaloSemaphoreIsolationCommand extends HystrixObservableCommand<String> {
+    private final UserServiceHystrix userServiceHystrix;
+
+    public HaloSemaphoreIsolationCommand(UserServiceHystrix userServiceHystrix) {
+        super(HystrixCommandGroupKey.Factory.asKey("HaloGroup"));
+        this.userServiceHystrix = userServiceHystrix;
+    }
+
+    @Override
+    protected Observable<String> construct() {
+        return Observable.create(new Observable.OnSubscribe<String>() {
+            @Override
+            public void call(Subscriber<? super String> observer) {
+                try {
+                    if (!observer.isUnsubscribed()) {
+                        // a real example would do work like a network call here
+                        String s = userServiceHystrix.getResult();
+                        observer.onNext(s);
+                        observer.onCompleted();
+                    }
+                } catch (Exception e) {
+                    observer.onError(e);
+                }
+            }
+        });
+    }
+}

+ 68 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/hystrix/observable/HaloThreadIsolationCommand.java

@@ -0,0 +1,68 @@
+package cn.reghao.tnb.user.app.hystrix.observable;
+
+import cn.reghao.tnb.user.app.service.UserServiceHystrix;
+import com.netflix.hystrix.HystrixCommandGroupKey;
+import com.netflix.hystrix.HystrixCommandKey;
+import com.netflix.hystrix.HystrixCommandProperties;
+import com.netflix.hystrix.HystrixObservableCommand;
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
+import org.springframework.context.annotation.Scope;
+import org.springframework.stereotype.Component;
+import rx.Observable;
+import rx.Subscriber;
+
+/**
+ * 使用HystrixObservableCommand时,默认会使用信号量隔离
+ * 但是,即使这里强制使用了线程隔离,但是线程池不能控制了。
+ * {@link Setter}里,没把控制线程池的方法提供出来
+ *
+ * @author reghao
+ * @date 2025-07-23 17:53:30
+ */
+@Component
+@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
+public class HaloThreadIsolationCommand extends HystrixObservableCommand<String> {
+    private final UserServiceHystrix userServiceHystrix;
+
+    public HaloThreadIsolationCommand(UserServiceHystrix userServiceHystrix) {
+        super(setter());
+        this.userServiceHystrix = userServiceHystrix;
+    }
+
+    private static Setter setter() {
+        // 服务分组
+        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("halo");
+
+        // 服务标识
+        HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey("halo");
+
+        // 命令属性配置Hystrix 开启超时
+        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
+                // 采用线程池方式实现服务隔离
+                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
+                // 禁止
+                .withExecutionTimeoutEnabled(false);
+
+        return Setter.withGroupKey(groupKey).andCommandKey(commandKey)
+                .andCommandPropertiesDefaults(commandProperties);
+    }
+
+    @Override
+    protected Observable<String> construct() {
+        return Observable.create(new Observable.OnSubscribe<String>() {
+            @Override
+            public void call(Subscriber<? super String> observer) {
+                try {
+                    if (!observer.isUnsubscribed()) {
+                        // a real example would do work like a network call here
+                        String s = userServiceHystrix.getResult();
+                        observer.onNext(s);
+                        observer.onCompleted();
+                    }
+                } catch (Exception e) {
+                    observer.onError(e);
+                }
+            }
+        });
+    }
+}

+ 19 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/service/HystrixService.java

@@ -0,0 +1,19 @@
+package cn.reghao.tnb.user.app.service;
+
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+/**
+ * @author reghao
+ * @date 2025-07-23 17:51:45
+ */
+@Slf4j
+@Service
+public class HystrixService {
+    public String getResult() {
+        /*UserVO userVO = RequestContextHolder.get();
+        log.info("I am  hystrix pool thread,try to get threadlocal:{}", userVO);
+        return userVO.toString();*/
+        return null;
+    }
+}

+ 22 - 0
user/user-service/src/main/java/cn/reghao/tnb/user/app/service/UserServiceHystrix.java

@@ -63,9 +63,24 @@ public class UserServiceHystrix {
         }
 
         UserInfo userInfo = userProfileService.getUserInfo(userId);
+        log.info("{} -> {}", userInfo.getUserId(), userInfo.getScreenName());
         return JsonConverter.objectToJson(userInfo);
     }
 
+    public UserInfo getUserInfo1(Long userId) {
+        if (userId % 3 == 0) {
+            try {
+                log.info("{} 模拟耗时, 休眠 10s", userId);
+                Thread.sleep(10_000);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+
+        UserInfo userInfo = userProfileService.getUserInfo(userId);
+        return userInfo;
+    }
+
     public String getUserInfoFallback(Long userId) {
         log.info("hystrix fallback with userId {}", userId);
         Map<String, String> map = new HashMap<>();
@@ -73,4 +88,11 @@ public class UserServiceHystrix {
         map.put("msg", "hystrix fallback");
         return JsonConverter.objectToJson(map);
     }
+
+    public String getResult() {
+        /*UserVO userVO = RequestContextHolder.get();
+        log.info("I am  hystrix pool thread,try to get threadlocal:{}", userVO);
+        return userVO.toString();*/
+        return null;
+    }
 }