What is the Hystrix Isolation Policy?

Official documentation: https://github.com/Netflix/Hystrix/wiki/Configuration#executionisolationstrategy

Executing an isolation policy

This property instructs HystrixCommand.run() which isolation policy to execute, being one of the following two options.

  • THREAD - it is executed on a separate thread, and concurrent requests are limited by the number of threads in the thread pool
  • SEMAPHORE - it is executed on the calling thread, and concurrent requests are limited by the amount of semaphore

Problem

When the isolation policy is THREAD, there is no way to get the value in ThreadLocal.

So, when a feign remote call is made between services, there is no way to get the request object from the request header and thus the token.

Solution 1 - Violently modify the Hystrix isolation policy

Set the isolation policy to SEMAPHORE

1
hystrix.command.default.execution.isolation.strategy: SEMAPHORE

However, Hystrix officially does not recommend this practice and strongly recommends using SEMAPHORE as an isolation policy.

Thread or Semaphore The default, and the recommended setting, is to run HystrixCommands using thread isolation (THREAD) and HystrixObservableCommands using semaphore isolation (SEMAPHORE).

Commands executed in threads have an extra layer of protection against latencies beyond what network timeouts can offer.

Generally the only time you should use semaphore isolation for HystrixCommands is when the call is so high volume (hundreds per second, per instance) that the overhead of separate threads is too high; this typically only applies to non-network calls.

Solution 2 - Custom Concurrency Policy

Custom Concurrency Strategy

Just write a class that inherits from HystrixConcurrencyStrategy and override the wrapCallable method.

CustomFeignHystrixConcurrencyStrategy :

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
package com.lzhpo.common.feign.strategy;

import com.netflix.hystrix.HystrixThreadPoolKey;
import com.netflix.hystrix.HystrixThreadPoolProperties;
import com.netflix.hystrix.strategy.HystrixPlugins;
import com.netflix.hystrix.strategy.concurrency.HystrixConcurrencyStrategy;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariable;
import com.netflix.hystrix.strategy.concurrency.HystrixRequestVariableLifecycle;
import com.netflix.hystrix.strategy.eventnotifier.HystrixEventNotifier;
import com.netflix.hystrix.strategy.executionhook.HystrixCommandExecutionHook;
import com.netflix.hystrix.strategy.metrics.HystrixMetricsPublisher;
import com.netflix.hystrix.strategy.properties.HystrixPropertiesStrategy;
import com.netflix.hystrix.strategy.properties.HystrixProperty;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
 * Custom isolation strategy to solve the problem that {@code RequestContextHolder.getRequestAttributes()} is empty
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Primary
@Component
public class CustomFeignHystrixConcurrencyStrategy extends HystrixConcurrencyStrategy {

    private HystrixConcurrencyStrategy hystrixConcurrencyStrategy;

    public CustomFeignHystrixConcurrencyStrategy() {
        try {
            this.hystrixConcurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
            if (this.hystrixConcurrencyStrategy instanceof CustomFeignHystrixConcurrencyStrategy) {
                // Welcome to singleton hell...
                return;
            }
            HystrixCommandExecutionHook commandExecutionHook = HystrixPlugins.getInstance().getCommandExecutionHook();
            HystrixEventNotifier eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
            HystrixMetricsPublisher metricsPublisher = HystrixPlugins.getInstance().getMetricsPublisher();
            HystrixPropertiesStrategy propertiesStrategy = HystrixPlugins.getInstance().getPropertiesStrategy();
            this.logCurrentStateOfHystrixPlugins(eventNotifier, metricsPublisher, propertiesStrategy);
            HystrixPlugins.reset();
            HystrixPlugins.getInstance().registerConcurrencyStrategy(this);
            HystrixPlugins.getInstance().registerCommandExecutionHook(commandExecutionHook);
            HystrixPlugins.getInstance().registerEventNotifier(eventNotifier);
            HystrixPlugins.getInstance().registerMetricsPublisher(metricsPublisher);
            HystrixPlugins.getInstance().registerPropertiesStrategy(propertiesStrategy);
        } catch (Exception e) {
            log.error("Failed to register Sleuth Hystrix Concurrency Strategy", e);
        }
    }

    private void logCurrentStateOfHystrixPlugins(HystrixEventNotifier eventNotifier,
        HystrixMetricsPublisher metricsPublisher, HystrixPropertiesStrategy propertiesStrategy) {
        if (log.isDebugEnabled()) {
            log.info(
                "Current Hystrix plugins configuration is [" + "concurrencyStrategy [" + this.hystrixConcurrencyStrategy
                    + "]," + "eventNotifier [" + eventNotifier + "]," + "metricPublisher [" + metricsPublisher + "],"
                    + "propertiesStrategy [" + propertiesStrategy + "]," + "]");
            log.info("Registering Sleuth Hystrix Concurrency Strategy.");
        }
    }

    @Override
    public <T> Callable<T> wrapCallable(Callable<T> callable) {
        RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
        return new WrappedCallable<>(callable, requestAttributes);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey, HystrixProperty<Integer> corePoolSize,
        HystrixProperty<Integer> maximumPoolSize, HystrixProperty<Integer> keepAliveTime, TimeUnit unit,
        BlockingQueue<Runnable> workQueue) {
        return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, corePoolSize, maximumPoolSize,
            keepAliveTime, unit, workQueue);
    }

    @Override
    public ThreadPoolExecutor getThreadPool(HystrixThreadPoolKey threadPoolKey,
        HystrixThreadPoolProperties threadPoolProperties) {
        return this.hystrixConcurrencyStrategy.getThreadPool(threadPoolKey, threadPoolProperties);
    }

    @Override
    public BlockingQueue<Runnable> getBlockingQueue(int maxQueueSize) {
        return this.hystrixConcurrencyStrategy.getBlockingQueue(maxQueueSize);
    }

    @Override
    public <T> HystrixRequestVariable<T> getRequestVariable(HystrixRequestVariableLifecycle<T> rv) {
        return this.hystrixConcurrencyStrategy.getRequestVariable(rv);
    }

    static class WrappedCallable<T> implements Callable<T> {
        private final Callable<T> target;
        private final RequestAttributes requestAttributes;

        public WrappedCallable(Callable<T> target, RequestAttributes requestAttributes) {
            this.target = target;
            this.requestAttributes = requestAttributes;
        }

        /**
         * feign opens the fuse (hystrix): feign.hystrix.enabled=ture, and uses the default signal isolation level,
         * The HttpServletRequest object is independent of each other in the parent thread
         * and the child thread and is not shared.
         * So the HttpServletRequest data of the parent thread used in the child thread is null,
         * naturally it is impossible to obtain the token information of the request header
         * In a multithreaded environment, call before the request, set the context before the call
         *
         * @return T
         * @throws Exception Exception
         */
        @Override
        public T call() throws Exception {
            try {
                // Set true to share the parent thread's HttpServletRequest object setting
                RequestContextHolder.setRequestAttributes(requestAttributes, true);
                return target.call();
            } finally {
                RequestContextHolder.resetRequestAttributes();
            }
        }
    }
}

SpringCloud integration with Feign

Configure Feign request interceptor (carry token in request header)

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.lzhpo.common.feign.starter.config;

import feign.RequestInterceptor;
import feign.RequestTemplate;
import java.util.Enumeration;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import javax.servlet.http.HttpServletRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

/**
 * Feign interceptor config, set the request header.
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Configuration
public class CustomFeignConfig implements RequestInterceptor {

  private static final String TOKEN_HEADER = "Authorization";

  @Override
  public void apply(RequestTemplate requestTemplate) {
    Optional.ofNullable(getHttpServletRequest())
        .ifPresent(
            request -> {
              Map<String, String> headers = getHeaders(request);
              String token = headers.get(TOKEN_HEADER);
              log.info("Feign request header: {}", token);
              requestTemplate.header(TOKEN_HEADER, token);
            });
  }

  private HttpServletRequest getHttpServletRequest() {
    return Optional.ofNullable(RequestContextHolder.getRequestAttributes())
        .map(ServletRequestAttributes.class::cast)
        .map(ServletRequestAttributes::getRequest)
        .orElse(null);
  }

  private Map<String, String> getHeaders(HttpServletRequest request) {
    Map<String, String> map = new LinkedHashMap<>();
    Enumeration<String> enumeration = request.getHeaderNames();
    while (enumeration.hasMoreElements()) {
      String key = enumeration.nextElement();
      String value = request.getHeader(key);
      map.put(key, value);
    }

    log.info("Request header carried between services:{}", map);
    return map;
  }
}

Configure OkHttp

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.lzhpo.common.feign.config;

import com.lzhpo.common.feign.interceptor.LogInterceptor;
import feign.Feign;
import okhttp3.ConnectionPool;
import okhttp3.OkHttpClient;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
import org.springframework.cloud.openfeign.FeignAutoConfiguration;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.concurrent.TimeUnit;

/**
 * OkHttp config
 *
 * @author Zhaopo Liu
 */
@Configuration
@ConditionalOnClass(Feign.class)
@AutoConfigureBefore(FeignAutoConfiguration.class)
public class FeignOkHttpConfig {

    @Bean
    public OkHttpClient okHttpClient() {
        return new OkHttpClient.Builder()
                // Connect timeout
                .connectTimeout(60, TimeUnit.SECONDS)
                // Read timeout
                .readTimeout(60, TimeUnit.SECONDS)
                // Write timeout
                .writeTimeout(60, TimeUnit.SECONDS)
                // Whether to reconnect automatically
                .retryOnConnectionFailure(true).connectionPool(new ConnectionPool())
                // Log interceptor
                .addInterceptor(new LogInterceptor())
                .build();
    }
}

Configure RestTemplate

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
package com.lzhpo.common.feign.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestTemplate;

/**
 * RestTemplateConfig
 *
 * @author Zhaopo Liu
 */
@Configuration
public class RestTemplateConfig {

    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

Log interceptor

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
package com.lzhpo.common.feign.interceptor;

import lombok.extern.slf4j.Slf4j;
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;

/**
 * Log interceptor
 *
 * @author Zhaopo Liu
 */
@Slf4j
public class LogInterceptor implements Interceptor {

    @Override
    public Response intercept(Chain chain) throws IOException {
        long t1 = System.nanoTime();
        Request request = chain.request();
        log.info(String.format("sending %s request %s%n%s", request.method(), request.url(), request.headers()));
        Response response = chain.proceed(request);
        long t2 = System.nanoTime();
        log.info(String.format("received response for %s in %.1fms%n%s", response.request().url(), (t2 - t1) / 1e6d,
            response.headers()));
        return response;
    }
}

Auto-injection configuration

I am a common-feign dependency and need to configure spring.facts for auto-injection.

So, configure: /resources/META-INF/spring.facts

1
2
3
4
5
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.lzhpo.common.feign.config.CustomFeignConfig,\
  com.lzhpo.common.feign.config.FeignOkHttpConfig,\
  com.lzhpo.common.feign.config.RestTemplateConfig,\
  com.lzhpo.common.feign.strategy.CustomFeignHystrixConcurrencyStrategy

–Integrated user service demo–

Configuration file

application.yml

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# feign config
feign:
  httpclient:
    enabled: false
  okhttp:
    enabled: true
  hystrix:
    enabled: true

# hystrix config
hystrix:
  shareSecurityContext: true
  command:
    default:
      execution:
        isolation:
          thread:
            timeoutInMilliseconds: 60000

management:
  endpoints:
    web:
      exposure:
        include: "*"
  endpoint:
    health:
      show-details: ALWAYS

User Service Client

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.lzhpo.systemservice.api.feign;

import com.lzhpo.common.constant.SysConst;
import com.lzhpo.common.feign.config.CustomFeignConfig;
import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.factory.UserServiceClientFallbackFactory;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;

import java.util.List;

/** @author Zhaopo Liu */
@FeignClient(value = SysConst.SYSTEM_SERVICE, configuration = CustomFeignConfig.class,
        fallbackFactory = UserServiceClientFallbackFactory.class)
public interface UserServiceClient {

    @GetMapping("/api/user/v1/user/findUserInfoByUsername")
    ResultVo<SecurityUser> findUserInfoByUsername(@RequestParam("username") String username);
}

Log breaker, Feign fallback method

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
package com.lzhpo.systemservice.api.feign.fallback;

import com.lzhpo.common.response.ResultVo;
import com.lzhpo.common.entity.SecurityUser;
import com.lzhpo.systemservice.api.entity.SysRole;
import com.lzhpo.systemservice.api.feign.UserServiceClient;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.util.List;

/**
 * Log circuit breaker implementation, fallback method called by feign
 *
 * @author Zhaopo Liu
 */
@Slf4j
@Component
public class UserServiceClientFallbackImpl implements UserServiceClient {

    private Throwable throwable;

    public Throwable getThrowable() {
        return throwable;
    }

    public void setThrowable(Throwable throwable) {
        this.throwable = throwable;
    }

    @Override
    public ResultVo<SecurityUser> findUserInfoByUsername(String username) {
        log.error("Query user information based on user name [{}] failed! Exception information: {}", username, throwable.getMessage());
        return null;
    }
}

User circuit breaker factory

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package com.lzhpo.systemservice.api.feign.factory;

import com.lzhpo.systemservice.api.feign.UserServiceClient;
import com.lzhpo.systemservice.api.feign.fallback.UserServiceClientFallbackImpl;
import feign.hystrix.FallbackFactory;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * User circuit breaker factory
 *
 * @author Zhaopo Liu
 */
@Component
@Slf4j
public class UserServiceClientFallbackFactory implements FallbackFactory<UserServiceClient> {

    @Override
    public UserServiceClient create(Throwable throwable) {
        UserServiceClientFallbackImpl userServiceClientFallback = new UserServiceClientFallbackImpl();
        userServiceClientFallback.setThrowable(throwable);
        log.error("The user circuit breaker detects the feign abnormality, the reason for the abnormality: {}", throwable.getMessage());
        return userServiceClientFallback;
    }
}

Reference

  • http://www.lzhpo.com/article/167