Spring Cloud Feign传输Header,并保证多线程情况下也适用
一、现象
微服务在生产中,常遇到需要把 header 传递到下一子服务的情况(如服务A访问服务B的接口,需要传递header),网上大多数的方案是实现 RequestInterceptor 接口,在重写方法中,把 header 填进 Feign 的请求中。我们先按这种方式,简单实现代码如下:
1、继承RequestInterceptor
服务A新建类,继承 RequestInterceptor,把 header 设置到请求中,注意 header 的key若是大写时,请求中一般会被转为小写,所以建议header的key一般设置为小写。
package com.he.feign.config;import feign.RequestInterceptor;
import feign.RequestTemplate;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>: 1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();//当主线程的请求执行完毕后,Servlet容器会被销毁当前的Servlet,因此在这里需要做判空if (attributes != null) {HttpServletRequest request = attributes.getRequest();Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()) {String name = headerNames.nextElement();//不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写if (name.equals("feignheader")) {requestTemplate.header(name,request.getHeader(name));}}}}}
2、修改 hystrix 的隔离策略为 semaphore
RequestContextHolder.getRequestAttributes()方法,实际上是从ThreadLocal变量中取得相应信息的。hystrix断路器的默认隔离策略为THREAD,该策略是无法取得ThreadLocal值的,所以需要修改hystrix的隔离策略,一般是改为[semaphore],在服务A中的 yml 新增配置如下
#2、hystrix 的隔离策略改为 SEMAPHORE
hystrix:command:default:execution:timeout:enable: trueisolation:strategy: SEMAPHOREthread:timeoutInMilleseconds: 60000
3、客户端A的测试代码
3.1、服务A的controller接口
package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>: 测试* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet容器不会销毁HttpServletRequest,//所以请求属性还保存在请求链路中,能被传递下去CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}
3.2、Feign类
feignclient的注解可以省略configuration配置,即configuration = FeignConfig.class可不声明
package com.he.feign.feign;import com.he.feign.feign.hystrix.HeaderFeignFallback;
import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;/*** <b>@Desc</b>: TODO* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
//@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class,configuration = FeignConfig.class)//可以省略configuration配置
@FeignClient(value = "eureka-client",path = "/header",fallback = HeaderFeignFallback.class)
public interface HeaderFeign {@GetMapping("/test")String test();
}
package com.he.feign.feign.hystrix;import com.he.feign.feign.HeaderFeign;
import org.springframework.stereotype.Component;/*** <b>@Desc</b>: TODO* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/1* <b>@Modify</b>:*/
@Component
public class HeaderFeignFallback implements HeaderFeign {@Overridepublic String test() {return null;}
}
4、服务端B的接口代码
package com.he.eurekaclient.controller;import com.he.eurekaclient.feign.HelloFeign;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;/*** <b>@Desc</b>: 测试header传递* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@RequestMapping("/header")
@RestController
public class HeaderController {@Value("${spring.application.name}")private String appName;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignHeader-test@GetMapping("/test")public String test() {StringBuffer sb = new StringBuffer("hello from ").append(appName).append("\n");StringBuffer requestURL = servletRequest.getRequestURL();sb.append("requestURL: ").append(requestURL).append("\n");boolean isContain = false;sb.append("headers: \n");Enumeration<String> headerNames = servletRequest.getHeaderNames();//header的name都是小写while (headerNames.hasMoreElements()){String headername = headerNames.nextElement();String headerValue = servletRequest.getHeader(headername);sb.append(headername).append("-").append(headerValue).append("\n");if (headername.equals("feignheader")) isContain = true;}if (!isContain) {sb.append("--error--").append("not contain required header!");}return sb.toString();}
}
5、启动服务,在postman中测试如下
5.1、调用接口 http://localhost:8060/test_header/main_thread,结果如下
5.2、调用接口 http://localhost:8060/test_header/sub_thread ,结果如下
5.3、调用 http://localhost:8060/test_header/sub_thread/block,结果如下
从5.1 – 5.3的查询结果,可以得到结论
- 经过上述的配置后,用户线程(主线程)中调用非feign请求,可把header传递到服务B中;
- 若在用户线程(主线程)中启动子线程,并在子线程中调用feign请求,header传递不到服务B中;
- 即是子线程最终异步转同步阻塞等待结果,header仍传递不到服务B中。
二、网络上大多数的解决方案
出现上面的原因, 主要是 RequestAttributes 默认不是线程共享的;主线程调用子线程时,没把 RequestAttributes 共享给子线程。因此,只要在主线程调用其他线程前将RequestAttributes对象设置为子线程共享,就能把header等信息传递下去。
1、因此,网络上大多数的解决方案如下,在主线程调用子线程前,增加下面配置
RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承,线程共享
修改后的代码如下
package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>: 测试* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}
2、重新启动服务A,再次调用两个带子线程的接口,现象如下
调用 http://localhost:8060/test_header/sub_thread/block,结果如下
调用接口 http://localhost:8060/test_header/sub_thread ,结果如下
测试结果,有以下两种现象
- 在主线程get()阻塞等待子线程执行完毕时,每次请求都成功;
- 主线程直接启动子线程,且执行完自己逻辑后便结束不需理会子线程结果的,请求偶尔成功, 偶尔失败;
这是为什么呢,作者认为主要是以下原因
- Servlet容器中Servlet属性生命周期与接收请求的用户线程(父线程)同步, 随着父线程执行完destroy()而销毁;
- 子线程虽然可以从父线程共享信息中获得了请求属性,但这个属性由父线程维护
- 当父线程比子线程执行完慢时,请求属性还在,子线程请求成功;当快时,请求属性随着父线程结束而销毁,子线程的请求属性变为null,请求失败。
由此可见,简单的设置 RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);
在多线程情况下, 并非是一劳永逸的。
三、作者的解决方案
针对上面的问题,及问题根本原因,我们团队的解决方案仍是使用 ThreadLocal,进行线程间的变量共享通信。
1、新建 ThreadLocalUtil
package com.he.feign.thread;import java.util.HashMap;
import java.util.Map;/*** <b>@Desc</b>: 线程共享* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/22* <b>@Modify</b>:*/
public class ThreadLocalUtil {//使用InheritableThreadLocal,使得共享变量可被子线程继承private static final InheritableThreadLocal<Map<String,String>> headerMap = new InheritableThreadLocal<Map<String, String>>(){@Overrideprotected Map<String, String> initialValue() {return new HashMap<>();}};public static Map<String,String> get(){return headerMap.get();}public static String get(String key) {return headerMap.get().get(key);}public static void set(String key, String value){headerMap.get().put(key,value);}
}
2、修改服务A 的接口 TestHeaderController
package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import com.he.feign.thread.ThreadLocalUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.Enumeration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>: 测试* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承Enumeration<String> headerNames = servletRequest.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,servletRequest.getHeader(name));}}CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: ", resp);return resp;}
}
3、修改服务A的 FeignConfig
package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import feign.RequestInterceptor;
import feign.RequestTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;import java.util.Map;/*** <b>@Desc</b>: 1、继承RequestInterceptor,把header设置到请求中,注意header的key若是大写时,请求中会被转为小写* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Slf4j
@Configuration
public class FeignConfig implements RequestInterceptor {@Overridepublic void apply(RequestTemplate requestTemplate) {
// ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
// //当主线程的请求执行完毕后,Servlet会被销毁,因此在这里需要做判空
// if (attributes != null) {
// HttpServletRequest request = attributes.getRequest();
//
// Enumeration<String> headerNames = request.getHeaderNames();
//
// while (headerNames.hasMoreElements()) {
// String name = headerNames.nextElement();
// //不能把所有消息头都传递下去,否则会引起其他异常;header的name都是小写
// if (name.equals("feignheader")) {
// requestTemplate.header(name,request.getHeader(name));
// }
// }
// }//读取设置的header信息,传递到下一个服务Map<String, String> headerMap = ThreadLocalUtil.get();for (String key : headerMap.keySet()) {log.info("--从ThreadLocal获取消息头传递到下一个服务:key-[{}],value-[{}]",key,headerMap.get(key));requestTemplate.header(key,headerMap.get(key));}}
}
4、重启服务A,测试结果如下
4.1、连续调用 http://localhost:8060/test_header/sub_thread 接口,日志打印如下
2020-06-22 23:18:23.658 INFO 18236 --- [ Thread-131] com.he.feign.config.FeignConfig : --从ThreadLocal获取消息头传递到下一个服务:key-[feignheader],value-[test]
2020-06-22 23:18:23.662 INFO 18236 --- [ Thread-131] c.h.f.controller.TestHeaderController : resp: hello from eureka-client
requestURL: http://192.168.56.1:8200/header/test
headers:
feignheader-test
accept-*/*
user-agent-Java/1.8.0_162
host-192.168.56.1:8200
connection-keep-alive
结合执行日志可知,header信息通过feign成功传递到下一个服务,而且不再出现偶尔失败的情况!
4.2、连续调用接口 http://localhost:8060/test_header/sub_thread/block
综上可见,真正解决从网关或者上层链路,把header经过feign传递到另一个服务,既要配置feign,也需要结合threadlocal。
下一步的优化,可设置拦截器或者切面,把header信息统一设置到threadlocal中。
package com.he.feign.config;import com.he.feign.thread.ThreadLocalUtil;
import org.springframework.web.servlet.HandlerInterceptor;import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.Enumeration;
import java.util.Objects;/*** <b>@Desc</b>: 拦截器* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/22* <b>@Modify</b>:*/
public class MyInterceptor implements HandlerInterceptor {@Overridepublic boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {//拦截请求,设置header到ThreadLocal中Enumeration<String> headerNames = request.getHeaderNames();while (headerNames.hasMoreElements()){String name = headerNames.nextElement();if (Objects.equals(name,"feignheader")){ThreadLocalUtil.set(name,request.getHeader(name));}}return true;}}
package com.he.feign.config;import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurationSupport;/*** <b>@Desc</b>: web配置* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/25* <b>@Modify</b>:*/
@Configuration
public class WebConfig extends WebMvcConfigurationSupport {@Overrideprotected void addInterceptors(InterceptorRegistry registry) {//添加自定义的拦截器registry.addInterceptor(new MyInterceptor()).addPathPatterns("/**");}
}
TestHeaderController修改如下
package com.he.feign.controller;import com.he.feign.feign.HeaderFeign;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.servlet.http.HttpServletRequest;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;/*** <b>@Desc</b>: 测试* <b>@Author</b>: hesh* <b>@Date</b>: 2020/6/21* <b>@Modify</b>:*/
@Slf4j
@RequestMapping("/test_header")
@RestController
public class TestHeaderController {@Autowiredprivate HeaderFeign headerFeign;@Value("${server.port}")private String port;@Autowiredprivate HttpServletRequest servletRequest;//请求需要带请求头(key-value): feignheader-test@GetMapping("/main_thread")public String mainThread() {String resp = headerFeign.test();log.info("resp: {}", resp);return resp;}@GetMapping("/sub_thread")public void subThread() {
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承// Enumeration<String> headerNames = servletRequest.getHeaderNames();
// while (headerNames.hasMoreElements()){
// String name = headerNames.nextElement();
// if (Objects.equals(name,"feignheader")){
// ThreadLocalUtil.set(name,servletRequest.getHeader(name));
// }
// }new Thread(() -> {new Thread(() -> {new Thread(() -> {String resp = headerFeign.test();log.info("resp: {}", resp);}).start();}).start();}).start();}@GetMapping("/sub_thread/block")public String subThreadBlock() {//在主线程阻塞等待结果,由于请求仍有效没执行完毕,此时Servlet不会销毁,请求属性还保存在请求链路中,能被传递下去
// RequestContextHolder.setRequestAttributes(RequestContextHolder.getRequestAttributes(),true);//请求属性可继承// Enumeration<String> headerNames = servletRequest.getHeaderNames();
// while (headerNames.hasMoreElements()){
// String name = headerNames.nextElement();
// if (Objects.equals(name,"feignheader")){
// ThreadLocalUtil.set(name,servletRequest.getHeader(name));
// }
// }CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> headerFeign.test());String resp = null;try {resp = future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();}log.info("resp: {}", resp);return resp;}}
以上,便是作者针对spring cloud feign 传递 header 信息在多线程情况下失败问题的解决方式,若有错误请指正,欢迎交流指导。