Spring Cloud 分流
背景
因为项目上需要做子系统迁移,为了尽量减少生产的错误,所以需要做分流,先打5%流量到新的子系统,逐渐增大到100%。
各种调用方式的分流
项目中总共有三种调用方式:Feign、Zuul和OkHttp,三种都找到了拦截器的方法:
Zuul
直接继承ZuulFilter即可。
public class UrlPathFilterExample extends ZuulFilter {
private static final Logger logger = LoggerFactory.getLogger(UrlPathFilterExample.class.getName());
@Override
public String filterType() {
return "pre";
}
@Override
public int filterOrder() {
return 11;
}
@Override
public boolean shouldFilter() {
return true;
}
@Autowired
@Qualifier(GRAY_RELEASE_ZUUL_EXAMPLE)
private GrayReleaseService grayReleaseService;
@PostConstruct
public void init() {
logger.info("UrlPathFilter is constructed");
}
@Override
public Object run() {
try {
RequestContext ctx = RequestContext.getCurrentContext();
HttpServletRequest httpRequest = ctx.getRequest();
String requestUrl = httpRequest.getRequestURI();
//分流逻辑
InterfaceGrayEntity grayEntity = grayReleaseService.matchedGrayRelease(requestUrl);
if(!Objects.isNull(grayEntity)){
requestUrl = requestUrl.replaceAll(grayEntity.getInitServiceName(), grayEntity.getRedirectServiceName());
logger.info("new requestUrl: {}", requestUrl);
ctx.put(FilterConstants.REQUEST_URI_KEY, requestUrl);
ctx.put(FilterConstants.SERVICE_ID_KEY, grayEntity.getRedirectServiceName());
}
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
OkHttp
添加Interceptor
@Component
@ConditionalOnProperty(value = "interface.gray.enable")
public class GrayReleaseInterceptorExample implements Interceptor {
private static final Logger logger = LoggerFactory.getLogger(GrayReleaseInterceptorExample.class.getName());
@Autowired
@Qualifier(GRAY_RELEASE_OKHTTP_EXAMPLE)
private GrayReleaseService grayReleaseService;
@Override
public Response intercept(Chain chain) throws IOException {
Request request = chain.request();
HttpUrl oldHttpUrl = request.url();
String url = oldHttpUrl.url().toString();
//分流
InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(url);
if(entity == null){
LoggerUtil.info("didnt match gray release service");
} else {
url = url.replace(entity.getInitServiceName(),
entity.getRedirectServiceName());
request = new Request.Builder().url(url).get().build();
}
return chain.proceed(request);
}
}
生成OkHttp客户端:
@Component
@ConditionalOnProperty(value = "interface.gray.enable")
public class AsyncRequestUtilExample {
@Autowired
private GrayReleaseInterceptorExample grayReleaseInterceptor;
@Bean(value = "grayReleaseOkHttpClient")
public OkHttpClient buildOkHttpClient() {
Dispatcher dispatcher = new Dispatcher();
dispatcher.setMaxRequests(1000000);
dispatcher.setMaxRequestsPerHost(100000);
ConnectionPool pool = new ConnectionPool(30,5, TimeUnit.MINUTES);
//在这里加interceptor
return new OkHttpClient.Builder()
.hostnameVerifier(new HostnameVerifier() {
public boolean verify(String hostnamem, SSLSession session) {
return true;
}
})
.dispatcher(dispatcher)
.connectionPool(pool)
.connectTimeout(2500, TimeUnit.MILLISECONDS)
.readTimeout(6000, TimeUnit.MILLISECONDS)
.writeTimeout(6000, TimeUnit.MILLISECONDS)
.addInterceptor(grayReleaseInterceptor)
.build();
}
}
Feign
替换Client
一开始是通过实现 LoadBalancerFeignClient 提供自己的Client:
public class GrayReleaseLoadBalancerFeignClient extends LoadBalancerFeignClient {
private static final Logger logger = LoggerFactory.getLogger(GrayReleaseLoadBalancerFeignClient.class.getName());
private DiscoveryClient discoveryClient;
private Client delegate;
private CachingSpringLoadBalancerFactory lbClientFactory;
private SpringClientFactory clientFactory;
private GrayReleaseService grayReleaseService;
public GrayReleaseLoadBalancerFeignClient(Client delegate, CachingSpringLoadBalancerFactory lbClientFactory,
SpringClientFactory clientFactory, DiscoveryClient discoveryClient,
GrayReleaseService grayReleaseService) {
super(delegate, lbClientFactory, clientFactory);
this.delegate = delegate;
this.lbClientFactory = lbClientFactory;
this.clientFactory = clientFactory;
this.discoveryClient = discoveryClient;
this.grayReleaseService = grayReleaseService;
logger.info("gray release feign 负载均衡器初始化");
}
@Override
public Response execute(Request request, Request.Options options) throws IOException {
logger.info("execute request: {}", request);
String url = request.url();
String clientName = URI.create(url).getHost();
InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(clientName);
if(!Objects.isNull(entity)) {
url = url.toUpperCase().replace(entity.getInitServiceName().toUpperCase(), entity.getRedirectServiceName().toUpperCase());
//重新构建 request 对象
Request newRequest = Request.create(request.method(),
url, request.headers(), request.body(),
request.charset());
return super.execute(newRequest, options);
}
return super.execute(request, options);
}
}
开发环境运行的好好的,到了测试环境就失效了。。*
经过排查,发现如果开启了sleuth就会失败,原来sleuth也是实现了自己的Client,默认返回它实现的Client:可以看到下面的代码,在feign.Client处替换为了TraceFeignClient
@Aspect
class TraceFeignAspect {
private final BeanFactory beanFactory;
TraceFeignAspect(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
}
@Around("execution (* feign.Client.*(..))")
public Object feignClientWasCalled(final ProceedingJoinPoint pjp) throws Throwable {
Object[] args = pjp.getArgs();
Request request = (Request) args[0];
Request.Options options = (Request.Options) args[1];
Object bean = pjp.getTarget();
if (!(bean instanceof TraceFeignClient)) {
return new TraceFeignClient(this.beanFactory, (Client) bean).execute(request, options);
}
return pjp.proceed();
}
}
这个问题在sleuth和seata共用的时候也会出现,因为seata也是用同样的方式对FeignClient进行了代理。
这里有一种通过替换TraceFeignClient的delegate的方法,但是未尝试:http://springcloud.cn/view/41
RequestInterceptor
想到可以用RequestInterceptor:
@Component
public class FeignRequestInterceptor implements RequestInterceptor {
private static final Logger logger = LoggerFactory.getLogger(FeignRequestInterceptor.class.getName());
@Override
public void apply(RequestTemplate template) {
logger.info("get templatTraceLoadBalancerFeignCliente {}", template);
}
}
但是尝试了之后,发现2.x版本的RequestInterceptor不能修改url!!!
升到3.x即可,但是我们不能随便升,只能用公司提供的库。。。
AOP
往事不决找AOP。这里开始尝试使用AOP来解决问题。
一顿操作猛如虎,先使用Spring AOP,发现不行:
@Pointcut("execution(* feign.Client+.*(..))")
public void pointCutFeign(){}
看了Stack Overflow后,尝试在LoadBalancerFeignClient上做切面:
@Pointcut("execution(* org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient.execute(..))")
public void pointCutFeign(){}
@Around("pointCutFeign()")
public Object myAroundFeign(ProceedingJoinPoint joinPoint) throws Throwable {
if (joinPoint.getArgs().length > 0) {
Request request = (Request) joinPoint.getArgs()[0];
logger.info("REQUEST >>>>>>>>>>>>");
logger.info("URL = "+request.url());
logger.info("METHOD = "+request.httpMethod().name());
logger.info("BODY = "+request.requestBody().asString());
logger.info("HEADERS = "+request.headers().toString());
}
Response response = (Response) joinPoint.proceed();
logger.info("RESPONSE <<<<<<<<<<<<<<");
logger.info("STATUS = "+response.status());
logger.info("HEADERS = "+response.headers().toString());
logger.info("BODY = " + IOUtils.toString(response.body().asInputStream(), "UTF-8"));
return response;
}
结果还是那个问题,不开sleuth是好的,开了就不行,因为sleuth把FeignClient代理了。
sleuth之后的版本是修改了这种设定,但是苦主无法决定能用到哪个版本的库,所以继续狗带。
再试一把使用AspectJ,还是不行:
@Component
@EnableAspectJAutoProxy
public class FeignCostTimeAdvisor extends AbstractPointcutAdvisor {
// private static final Logger logger = LoggerFactory.getLogger(FeignCostTimeAdvisor.class.getName());
public FeignCostTimeAdvisor() {
System.out.println("初始化!!!!");
}
public static boolean existsAnnotation(Class<?> cs, Class<? extends Annotation> annotation) {
// logger.info("this {} super {}-{}", Objects.isNull(cs.getName()) ? "NULL": cs.getName(),
// Objects.isNull(cs.getSuperclass().getName()) ? "null" : cs.getSuperclass().getName(),
// Objects.isNull(cs.getGenericSuperclass().getTypeName())? "null" : cs.getGenericSuperclass().getTypeName());
System.out.println(cs.getName());
boolean rs = cs.isAnnotationPresent(annotation);
if (!rs) {
rs = cs.getSuperclass() != null && existsAnnotation(cs.getSuperclass(), annotation);
}
if (!rs) {
Class<?>[] interfaces = cs.getInterfaces();
for (Class<?> f : interfaces) {
rs = existsAnnotation(f, annotation);
if (rs) {
break;
}
}
}
if (!rs) {
Annotation[] annotations = cs.getAnnotations();
for (Annotation an : annotations) {
rs = annotation.isAssignableFrom(an.getClass());
if (rs) {
break;
}
}
}
return rs;
}
public static boolean isLoadBalancerFeignClient(Class<?> cs) {
boolean rs = cs.getName().contains("LoadBalancerFeignClient") ||
cs.getName().contains("TraceLoadBalancerFeignClient");
if(!rs){
rs = cs.getName().equals("feign.Client");
}
if(!rs && cs.getInterfaces().length > 0){
rs = cs.getInterfaces()[0].getName().equals("feign.Client");
}
if (!rs) {
rs = cs.getSuperclass() != null && isLoadBalancerFeignClient(cs.getSuperclass());
}
if (!rs) {
Class<?>[] interfaces = cs.getInterfaces();
for (Class<?> f : interfaces) {
rs = isLoadBalancerFeignClient(f);
if (rs) {
break;
}
}
}
return rs;
}
@Override
public Pointcut getPointcut() {
return new Pointcut() {
@Override
public ClassFilter getClassFilter() {
return (Class<?> clazz) -> isLoadBalancerFeignClient(clazz);
// return (Class<?> clazz) -> existsAnnotation(clazz, FeignClient.class);
}
@Override
public MethodMatcher getMethodMatcher() {
return MethodMatcher.TRUE;
}
};
}
@Override
public Advice getAdvice() {
return (MethodInterceptor) (MethodInvocation invocation) -> {
long start = System.currentTimeMillis();
Object[] args = invocation.getArguments();
Object result = invocation.proceed();
long end = System.currentTimeMillis();
System.out.println(invocation.getClass().getName() + " call " + invocation.getMethod() +
",耗时(ms):" + (end - start));
return result;
};
}
}
自定义负载均衡
最后还是使用自定义的负载均衡。debug后发现默认使用的负载均衡是ZoneAvoidanceRule,那么久继承它并实现自己的负载均衡就好了:
public class GrayReleaseRule extends ZoneAvoidanceRule implements ApplicationContextAware {
private static final Logger logger = LoggerFactory.getLogger(GrayReleaseRule.class.getName());
private static ApplicationContext applicationContext;
private GrayReleaseService grayReleaseService;
private DiscoveryClient discoveryClient;
public GrayReleaseRule() {
}
@Override
public Server choose(Object key) {
ILoadBalancer lb = getLoadBalancer();
if(grayReleaseService == null){
grayReleaseService = (GrayReleaseService)
GrayReleaseRule.applicationContext.getBean(GRAY_RELEASE_FEIGN_EXAMPLE);
}
if(discoveryClient == null){
discoveryClient = GrayReleaseRule.applicationContext.getBean(DiscoveryClient.class);
}
//分流
String clientName = ((ZoneAwareLoadBalancer) lb).getName();
logger.info("begin to gray release {}", clientName);
InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(clientName);
if(!Objects.isNull(entity)) {
Server server = getRedirectService(entity);
if(!Objects.isNull(server)){
return server;
}
}
//原始逻辑
return super.choose(key);
}
private Server getRedirectService(InterfaceGrayEntity entity){
List<ServiceInstance> serviceInstanceList = discoveryClient.getInstances(entity.getRedirectServiceName());
List<Server> serviceList = new ArrayList<>();
for(ServiceInstance serviceInstance : serviceInstanceList){
serviceList.add(new Server(serviceInstance.getHost() + ":" + serviceInstance.getPort()));
}
if(serviceList.size() == 0){
logger.error("empty service endpoint for {}", entity.getRedirectServiceName());
return null;
}
Server server = serviceList.get(chooseRandomInt(serviceList.size()));
logger.info("request ivap service-name {} iphost {}:{}",
entity.getRedirectServiceName(), server.getHost(), server.getPort());
return server;
}
//随机获取一个随机数
protected int chooseRandomInt(int serverCount) {
return ThreadLocalRandom.current().nextInt(serverCount);
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
if(GrayReleaseRule.applicationContext == null){
GrayReleaseRule.applicationContext = applicationContext;
}
}
}
注意,这里不能直接配置其为一个bean,需要在配置文件中指定:
<your-server-name>.ribbon.NFLoadBalancerRuleClassName=<package-name>.GrayReleaseRule
如果直接配置为bean,会发现不同的服务之间会错乱,因为不同的服务需要自己对应一个IRule实例,即路由策略为原型模式。
注意,使用这种方式的话,zuul就不需要继承ZuulFilter实现自己的过滤器了。
refer
https://blog.gomyck.com/posts/zuulCustomLoadBalanceRule/
https://blog.csdn.net/xichenguan/article/details/77448288