Spring Cloud 分流

背景

因为项目上需要做子系统迁移,为了尽量减少生产的错误,所以需要做分流,先打5%流量到新的子系统,逐渐增大到100%。

各种调用方式的分流

项目中总共有三种调用方式:Feign、Zuul和OkHttp,三种都找到了拦截器的方法:

Zuul

直接继承ZuulFilter即可。

public class UrlPathFilterExample extends ZuulFilter {
private static final Logger logger = LoggerFactory.getLogger(UrlPathFilterExample.class.getName());

@Override
public String filterType() {
    return "pre";
}

@Override
public int filterOrder() {
    return 11;
}

@Override
public boolean shouldFilter() {
    return true;
}

@Autowired
@Qualifier(GRAY_RELEASE_ZUUL_EXAMPLE)
private GrayReleaseService grayReleaseService;

@PostConstruct
public void init() {
    logger.info("UrlPathFilter is constructed");
}

@Override
public Object run() {

    try {
        RequestContext ctx = RequestContext.getCurrentContext();
        HttpServletRequest httpRequest = ctx.getRequest();
        String requestUrl = httpRequest.getRequestURI();
        //分流逻辑
        InterfaceGrayEntity grayEntity = grayReleaseService.matchedGrayRelease(requestUrl);
        if(!Objects.isNull(grayEntity)){
            requestUrl = requestUrl.replaceAll(grayEntity.getInitServiceName(), grayEntity.getRedirectServiceName());
            logger.info("new requestUrl: {}", requestUrl);
            ctx.put(FilterConstants.REQUEST_URI_KEY, requestUrl);
            ctx.put(FilterConstants.SERVICE_ID_KEY, grayEntity.getRedirectServiceName());
        }
    } catch (Exception e) {
        e.printStackTrace();
    }

    return null;
}

}

OkHttp

添加Interceptor

@Component
@ConditionalOnProperty(value = "interface.gray.enable")
public class GrayReleaseInterceptorExample implements Interceptor {
    private static final Logger logger = LoggerFactory.getLogger(GrayReleaseInterceptorExample.class.getName());

    @Autowired
    @Qualifier(GRAY_RELEASE_OKHTTP_EXAMPLE)
    private GrayReleaseService grayReleaseService;

    @Override
    public Response intercept(Chain chain) throws IOException {
        Request request = chain.request();
        HttpUrl oldHttpUrl = request.url();

        String url = oldHttpUrl.url().toString();

        //分流
        InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(url);
        if(entity == null){
            LoggerUtil.info("didnt match gray release service");
        } else {
            url = url.replace(entity.getInitServiceName(),
                    entity.getRedirectServiceName());
            request = new Request.Builder().url(url).get().build();
        }

        return chain.proceed(request);
    }

}

生成OkHttp客户端:

@Component
@ConditionalOnProperty(value = "interface.gray.enable")
public class AsyncRequestUtilExample {

    @Autowired
    private GrayReleaseInterceptorExample grayReleaseInterceptor;

    @Bean(value = "grayReleaseOkHttpClient")
    public OkHttpClient buildOkHttpClient() {

        Dispatcher dispatcher = new Dispatcher();
        dispatcher.setMaxRequests(1000000);
        dispatcher.setMaxRequestsPerHost(100000);
        ConnectionPool pool = new ConnectionPool(30,5, TimeUnit.MINUTES);

        //在这里加interceptor
        return new OkHttpClient.Builder()
                .hostnameVerifier(new HostnameVerifier() {
                    public boolean verify(String hostnamem, SSLSession session) {
                        return true;
                    }
                })
                .dispatcher(dispatcher)
                .connectionPool(pool)
                .connectTimeout(2500, TimeUnit.MILLISECONDS)
                .readTimeout(6000, TimeUnit.MILLISECONDS)
                .writeTimeout(6000, TimeUnit.MILLISECONDS)
                .addInterceptor(grayReleaseInterceptor)
                .build();
    }
}

Feign

替换Client

一开始是通过实现 LoadBalancerFeignClient 提供自己的Client:

public class GrayReleaseLoadBalancerFeignClient extends LoadBalancerFeignClient {
    private static final Logger logger = LoggerFactory.getLogger(GrayReleaseLoadBalancerFeignClient.class.getName());

    private DiscoveryClient discoveryClient;

    private Client delegate;
    private CachingSpringLoadBalancerFactory lbClientFactory;
    private SpringClientFactory clientFactory;
    private GrayReleaseService grayReleaseService;

    public GrayReleaseLoadBalancerFeignClient(Client delegate, CachingSpringLoadBalancerFactory lbClientFactory,
                                              SpringClientFactory clientFactory, DiscoveryClient discoveryClient,
                                              GrayReleaseService grayReleaseService) {
        super(delegate, lbClientFactory, clientFactory);
        this.delegate = delegate;
        this.lbClientFactory = lbClientFactory;
        this.clientFactory = clientFactory;

        this.discoveryClient = discoveryClient;
        this.grayReleaseService = grayReleaseService;
        logger.info("gray release feign 负载均衡器初始化");
    }

    @Override
    public Response execute(Request request, Request.Options options) throws IOException {
        logger.info("execute request: {}", request);
        String url = request.url();
        String clientName = URI.create(url).getHost();

        InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(clientName);

        if(!Objects.isNull(entity)) {
            url = url.toUpperCase().replace(entity.getInitServiceName().toUpperCase(), entity.getRedirectServiceName().toUpperCase());
            //重新构建 request 对象
            Request newRequest = Request.create(request.method(),
                    url, request.headers(), request.body(),
                    request.charset());

            return super.execute(newRequest, options);
        }

        return super.execute(request, options);
    }
}

开发环境运行的好好的,到了测试环境就失效了。。*
经过排查,发现如果开启了sleuth就会失败,原来sleuth也是实现了自己的Client,默认返回它实现的Client:可以看到下面的代码,在feign.Client处替换为了TraceFeignClient

@Aspect
class TraceFeignAspect {

	private final BeanFactory beanFactory;

	TraceFeignAspect(BeanFactory beanFactory) {
		this.beanFactory = beanFactory;
	}

	@Around("execution (* feign.Client.*(..))")
	public Object feignClientWasCalled(final ProceedingJoinPoint pjp) throws Throwable {
		Object[] args = pjp.getArgs();
		Request request = (Request) args[0];
		Request.Options options = (Request.Options) args[1];
		Object bean = pjp.getTarget();
		if (!(bean instanceof TraceFeignClient)) {
			return new TraceFeignClient(this.beanFactory, (Client) bean).execute(request, options);
		}
		return pjp.proceed();
	}
}

这个问题在sleuth和seata共用的时候也会出现,因为seata也是用同样的方式对FeignClient进行了代理。

官方issue:https://github.com/seata/seata/issues/2544

这里有一种通过替换TraceFeignClient的delegate的方法,但是未尝试:http://springcloud.cn/view/41

RequestInterceptor

想到可以用RequestInterceptor:

@Component
public class FeignRequestInterceptor implements RequestInterceptor {
    private static final Logger logger = LoggerFactory.getLogger(FeignRequestInterceptor.class.getName());
    @Override
    public void apply(RequestTemplate template) {
        logger.info("get templatTraceLoadBalancerFeignCliente {}", template);
    }

}

但是尝试了之后,发现2.x版本的RequestInterceptor不能修改url!!!
升到3.x即可,但是我们不能随便升,只能用公司提供的库。。。

AOP

往事不决找AOP。这里开始尝试使用AOP来解决问题。
一顿操作猛如虎,先使用Spring AOP,发现不行:

@Pointcut("execution(* feign.Client+.*(..))")
public void pointCutFeign(){}

看了Stack Overflow后,尝试在LoadBalancerFeignClient上做切面:

@Pointcut("execution(* org.springframework.cloud.openfeign.ribbon.LoadBalancerFeignClient.execute(..))")
public void pointCutFeign(){}

@Around("pointCutFeign()")
public Object myAroundFeign(ProceedingJoinPoint joinPoint) throws Throwable {
    if (joinPoint.getArgs().length > 0) {
        Request request = (Request) joinPoint.getArgs()[0];
        logger.info("REQUEST >>>>>>>>>>>>");
        logger.info("URL = "+request.url());
        logger.info("METHOD = "+request.httpMethod().name());
        logger.info("BODY = "+request.requestBody().asString());
        logger.info("HEADERS = "+request.headers().toString());
    }

    Response response = (Response) joinPoint.proceed();

    logger.info("RESPONSE <<<<<<<<<<<<<<");
    logger.info("STATUS = "+response.status());
    logger.info("HEADERS = "+response.headers().toString());
    logger.info("BODY = " + IOUtils.toString(response.body().asInputStream(), "UTF-8"));
    return response;
}

结果还是那个问题,不开sleuth是好的,开了就不行,因为sleuth把FeignClient代理了。
sleuth之后的版本是修改了这种设定,但是苦主无法决定能用到哪个版本的库,所以继续狗带。

再试一把使用AspectJ,还是不行:

@Component
@EnableAspectJAutoProxy
public class FeignCostTimeAdvisor extends AbstractPointcutAdvisor {
//    private static final Logger logger = LoggerFactory.getLogger(FeignCostTimeAdvisor.class.getName());

    public FeignCostTimeAdvisor() {
        System.out.println("初始化!!!!");
    }

    public static boolean existsAnnotation(Class<?> cs, Class<? extends Annotation> annotation) {
//        logger.info("this {} super {}-{}", Objects.isNull(cs.getName()) ? "NULL": cs.getName(),
//                Objects.isNull(cs.getSuperclass().getName()) ? "null" : cs.getSuperclass().getName(),
//                Objects.isNull(cs.getGenericSuperclass().getTypeName())? "null" : cs.getGenericSuperclass().getTypeName());
        System.out.println(cs.getName());
        boolean rs = cs.isAnnotationPresent(annotation);
        if (!rs) {
            rs = cs.getSuperclass() != null && existsAnnotation(cs.getSuperclass(), annotation);
        }
        if (!rs) {
            Class<?>[] interfaces = cs.getInterfaces();
            for (Class<?> f : interfaces) {
                rs = existsAnnotation(f, annotation);
                if (rs) {
                    break;
                }
            }
        }
        if (!rs) {
            Annotation[] annotations = cs.getAnnotations();
            for (Annotation an : annotations) {
                rs = annotation.isAssignableFrom(an.getClass());
                if (rs) {
                    break;
                }
            }
        }
        return rs;
    }

    public static boolean isLoadBalancerFeignClient(Class<?> cs) {
        boolean rs = cs.getName().contains("LoadBalancerFeignClient") ||
                cs.getName().contains("TraceLoadBalancerFeignClient");
        if(!rs){
            rs = cs.getName().equals("feign.Client");
        }
        if(!rs && cs.getInterfaces().length > 0){
            rs = cs.getInterfaces()[0].getName().equals("feign.Client");
        }
        if (!rs) {
            rs = cs.getSuperclass() != null && isLoadBalancerFeignClient(cs.getSuperclass());
        }
        if (!rs) {
            Class<?>[] interfaces = cs.getInterfaces();
            for (Class<?> f : interfaces) {
                rs = isLoadBalancerFeignClient(f);
                if (rs) {
                    break;
                }
            }
        }
        return rs;
    }

    @Override
    public Pointcut getPointcut() {
        return new Pointcut() {
            @Override
            public ClassFilter getClassFilter() {
                return (Class<?> clazz) -> isLoadBalancerFeignClient(clazz);
//                return (Class<?> clazz) -> existsAnnotation(clazz, FeignClient.class);
            }

            @Override
            public MethodMatcher getMethodMatcher() {
                return MethodMatcher.TRUE;
            }
        };
    }

    @Override
    public Advice getAdvice() {
        return (MethodInterceptor) (MethodInvocation invocation) -> {
            long start = System.currentTimeMillis();
            Object[] args = invocation.getArguments();
            Object result = invocation.proceed();
            long end = System.currentTimeMillis();
            System.out.println(invocation.getClass().getName() + " call " + invocation.getMethod() +
                    ",耗时(ms):" + (end - start));
            return result;
        };
    }


}

自定义负载均衡

最后还是使用自定义的负载均衡。debug后发现默认使用的负载均衡是ZoneAvoidanceRule,那么久继承它并实现自己的负载均衡就好了:

public class GrayReleaseRule extends ZoneAvoidanceRule implements ApplicationContextAware {
    private static final Logger logger = LoggerFactory.getLogger(GrayReleaseRule.class.getName());

    private static ApplicationContext applicationContext;

    private GrayReleaseService grayReleaseService;

    private DiscoveryClient discoveryClient;

    public GrayReleaseRule() {
    }

    @Override
    public Server choose(Object key) {
        ILoadBalancer lb = getLoadBalancer();

        if(grayReleaseService == null){
            grayReleaseService = (GrayReleaseService)
                    GrayReleaseRule.applicationContext.getBean(GRAY_RELEASE_FEIGN_EXAMPLE);
        }
        if(discoveryClient == null){
            discoveryClient = GrayReleaseRule.applicationContext.getBean(DiscoveryClient.class);
        }

        //分流
        String clientName = ((ZoneAwareLoadBalancer) lb).getName();
        logger.info("begin to gray release {}", clientName);

        InterfaceGrayEntity entity = grayReleaseService.matchedGrayRelease(clientName);
        if(!Objects.isNull(entity)) {
            Server server =  getRedirectService(entity);
            if(!Objects.isNull(server)){
                return server;
            }
        }
        //原始逻辑
        return super.choose(key);
    }

    private Server getRedirectService(InterfaceGrayEntity entity){
        List<ServiceInstance> serviceInstanceList = discoveryClient.getInstances(entity.getRedirectServiceName());
        List<Server> serviceList = new ArrayList<>();
        for(ServiceInstance serviceInstance : serviceInstanceList){
            serviceList.add(new Server(serviceInstance.getHost() + ":" + serviceInstance.getPort()));
        }
        if(serviceList.size() == 0){
            logger.error("empty service endpoint for {}", entity.getRedirectServiceName());
            return null;
        }
        Server server =  serviceList.get(chooseRandomInt(serviceList.size()));
        logger.info("request ivap service-name {} iphost {}:{}",
                entity.getRedirectServiceName(), server.getHost(), server.getPort());
        return server;
    }

    //随机获取一个随机数
    protected int chooseRandomInt(int serverCount) {
        return ThreadLocalRandom.current().nextInt(serverCount);
    }

    @Override
    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        if(GrayReleaseRule.applicationContext == null){
            GrayReleaseRule.applicationContext = applicationContext;
        }
    }
}

注意,这里不能直接配置其为一个bean,需要在配置文件中指定:

<your-server-name>.ribbon.NFLoadBalancerRuleClassName=<package-name>.GrayReleaseRule

如果直接配置为bean,会发现不同的服务之间会错乱,因为不同的服务需要自己对应一个IRule实例,即路由策略为原型模式。

注意,使用这种方式的话,zuul就不需要继承ZuulFilter实现自己的过滤器了。

refer

https://blog.gomyck.com/posts/zuulCustomLoadBalanceRule/
https://blog.csdn.net/xichenguan/article/details/77448288

comments powered by Disqus