XiaoLin's Blog

Xiao Lin

retrofit 添加对 SpringCloud LoadBalancer 的支特

166
2024-02-27

Retrofit是一个类型安全的HTTP客户端,用于Android和Java应用程序。它是由Square公司开发的。Retrofit让网络请求的实现变得非常简单和高效,通过将REST API调用转换为Java接口中的方法调用来工作。这样,开发者可以将注意力集中在与服务器交互的数据上,而不是处理底层的HTTP通信细节。

Retrofit提供了一系列强大的功能,包括:

  1. 类型安全:通过使用注解来描述HTTP请求(如GET、POST、PUT、DELETE等),并将这些请求映射到Java接口中的方法上,可以减少运行时错误。
  2. 简化数据解析:Retrofit可以配合Gson等库自动地将JSON或其他格式的响应数据序列化成Java对象。也就是说,你不需要手动解析网络响应;Retrofit会为你处理这一切。
  3. 异步和同步方式:支持异步和同步网络请求,使其更加灵活地适应不同的使用场景。
  4. 可定制性:Retrofit允许你通过OkHttp拦截器自定义HTTP请求和响应过程,例如添加通用头信息、监控请求时间或管理Cookies等。
  5. 多种数据格式支持:默认支持Gson库进行JSON解析,但也可通过转换器(Converter)轻松切换到其他如Moshi、Jackson或Protobuf等格式。
  6. 文件上传和下载:支持多种方式上传文件(如表单上传、二进制流)以及高效处理文件下载。
  • 如何使用

spring boot可以直接使用lianjiatech 大佬提供的 LianjiaTech/retrofit-spring-boot-starter: A spring-boot starter for retrofit, supports rapid integration and feature enhancements.(适用于retrofit的spring-boot-starter,支持快速集成和功能增强) (github.com)
以下的集成配置都是基于 retrofit-spring-boot-starter

要在项目中使用Retrofit, 你首先需要在pom.xml文件中添加依赖:

<dependency>  
  <groupId>com.github.lianjiatech</groupId>  
  <artifactId>retrofit-spring-boot-starter</artifactId>
  <version>latest</version>
</dependency>

之后就可以创建一个接口,并使用注解描述API端点了:

public interface MyApiService {
  @GET("users/{user}/repos")
  Call<List<Repo>> listRepos(@Path("user") String user);
}

最后,在目标类中注入对象,并直接调用目标方法即可:

@Autowird
private MyApiService myApiService;

myApiService.listRepos("name");

Spring Cloud Loadbalancer 是什么?

Spring Cloud LoadBalancer 是 Spring Cloud 的一个模块,它提供了一个客户端负载均衡的解决方案。在微服务架构中,服务之间的通信是不可避免的,而且常常需要通过网络调用远程服务。随着服务实例数量的增加,如何有效地分配请求以避免某些实例过载而其他实例空闲,成为了一个需要解决的问题。这就是负载均衡要解决的问题。Spring Cloud LoadBalancer 提供了一种简单有效的方式来在客户端进行负载均衡。

主要特性包括以下几点:

  1. 客户端负载均衡:与传统的服务端负载均衡相比(例如:Nginx),客户端负载均衡将决策权交给了客户端。这样可以根据实际情况(如响应时间、错误率等)动态地选择最佳的服务器实例。
  2. 与Spring生态系统集成:作为Spring Cloud项目的一部分,它与Spring Boot、Spring WebFlux等框架无缝集成,使得开发者可以轻松地在Spring应用中使用它。
  3. 灵活性和扩展性:提供多种负载均衡策略(如轮询、随机等),并且支持自定义策略。此外,它还支持对健康检查、缓存、重试等高级功能进行配置和扩展。
  4. 替代Netflix Ribbon:随着Netflix Ribbon进入维护模式,Spring Cloud LoadBalancer被设计为Ribbon的替代品,提供更现代化和可维护性更强的方案。

整合步骤

OpenFeign 实现 LoadBalancer 的思路

这里我们借鉴 OpenFeign 的实现思路,通过 LoadBalancerClientLoadBalancerClientFactory 来整合 Loadbanlancer 并实现 RPC,以下是OpenFeign的源码介绍。

我们通过OkHttpFeignLoadBalancerConfiguration自动配置类中,可以发现Feign的负载均衡客户端是通过FeignBlockingLoadBalancerClient来创建的,我们查看FeignBlockingLoadBalancerClient的源码。

  • FeignBlockingLoadBalancerClient
package org.springframework.cloud.openfeign.loadbalancer;

@SuppressWarnings({ "unchecked", "rawtypes" })
public class FeignBlockingLoadBalancerClient implements Client {

	private static final Log LOG = LogFactory.getLog(FeignBlockingLoadBalancerClient.class);

	private final Client delegate;

	private final LoadBalancerClient loadBalancerClient;

	private final LoadBalancerClientFactory loadBalancerClientFactory;

	private final List<LoadBalancerFeignRequestTransformer> transformers;
	
	@Override
	public Response execute(Request request, Request.Options options) throws IOException {
    	// 寻找服务id
		final URI originalUri = URI.create(request.url());
		String serviceId = originalUri.getHost();
		Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);

        // 获取负载均衡的提示信息
		String hint = getHint(serviceId);

        // 创建负载均衡请求,并获取相关的生命周期集合,并将状态设置为开始
		DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
				new RequestDataContext(buildRequestData(request), hint));
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
				.getSupportedLifecycleProcessors(
						loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
						RequestDataContext.class, ResponseData.class, ServiceInstance.class);
		supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));

        // 通过负载均衡客户端,获取到服务实例
		ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
		org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
				instance);

        // 如果没有获取到服务实例的处理逻辑
		if (instance == null) {
			String message = "Load balancer does not contain an instance for the service " + serviceId;
			if (LOG.isWarnEnabled()) {
				LOG.warn(message);
			}
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
					.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
							CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
			return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
					.body(message, StandardCharsets.UTF_8).build();
		}

        // 重构原始url,将host:port替换为负载均衡实例得到的地址
		String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();

        // 根据新的url,构建新的Feign Request对象
		Request newRequest = buildRequest(request, reconstructedUrl, instance);
		return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
				supportedLifecycleProcessors);
	}

	protected Request buildRequest(Request request, String reconstructedUrl) {
    	// 根据重构的URL创建新的请求对象
		return Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(),
				request.charset(), request.requestTemplate());
	}

	protected Request buildRequest(Request request, String reconstructedUrl, ServiceInstance instance) {
    	// 构建基础请求对象
		Request newRequest = buildRequest(request, reconstructedUrl);
		// 如果存在请求转换器,则对请求进行转换
		if (transformers != null) {
			for (LoadBalancerFeignRequestTransformer transformer : transformers) {
				newRequest = transformer.transformRequest(newRequest, instance);
			}
		}
		return newRequest;
	}

	// Visible for Sleuth instrumentation
	public Client getDelegate() {
		return delegate;
	}

	private String getHint(String serviceId) {
        // 加载负载均衡器属性
        LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
        // 获取默认提示信息
        String defaultHint = properties.getHint().getOrDefault("default", "default");
        // 获取服务ID对应的提示信息
        String hintPropertyValue = properties.getHint().get(serviceId);
        // 如果服务ID对应的提示信息存在,则使用该信息,否则使用默认提示信息
        return hintPropertyValue != null ? hintPropertyValue : defaultHint;
	}
}

可以看到,在execute方法的最后,调用了executeWithLoadBalancerLifecycleProcessing方法完成具体的负载均衡生命周期请求,我们进入方法,发现最后由LoadBalancerUtilsexecuteWithLoadBalancerLifecycleProcessing方法完成处理,具体的逻辑如下。

static Response executeWithLoadBalancerLifecycleProcessing(
        Client feignClient, // Feign客户端,用于执行实际的HTTP请求
        Request.Options options, // 请求选项,包含连接超时和读取超时设置
        Request feignRequest, // Feign请求对象,包含请求的URL、HTTP方法、请求头等信息
        org.springframework.cloud.client.loadbalancer.Request lbRequest, // 负载均衡器请求对象,用于负载均衡过程中的上下文信息传递
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse, // 负载均衡器响应对象,包含被选中的服务实例
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors, // 支持的负载均衡生命周期处理器集合
        boolean loadBalanced // 标志位,指示请求是否通过负载均衡器处理
) throws IOException {
    // 在请求开始之前,通过所有支持的生命周期处理器执行onStartRequest方法
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
    try {
        // 使用Feign客户端执行实际的HTTP请求,并获取响应
        Response response = feignClient.execute(feignRequest, options);
        if (loadBalanced) {
            // 如果请求通过负载均衡器处理,请求成功完成后,通过所有支持的生命周期处理器执行onComplete方法
            // 并将状态设置为SUCCESS
            supportedLifecycleProcessors.forEach(
                    lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
                            lbRequest, lbResponse, buildResponseData(response))));
        }
        // 返回Feign客户端得到的响应
        return response;
    }
    catch (Exception exception) {
        // 如果在请求过程中发生异常
        if (loadBalanced) {
            // 如果请求通过负载均衡器处理,通过所有支持的生命周期处理器执行onComplete方法
            // 并将状态设置为FAILED,同时传递异常信息
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                    new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
        }
        // 将异常抛出,以便调用方可以处理
        throw exception;
    }
}

负载均衡的 Hint 信息

负载均衡器的提示信息(Hint)是用于指导负载均衡器如何选择服务实例的附加信息。在微服务架构中,一个服务可能有多个实例运行在不同的服务器或环境中。负载均衡器负责根据一定的策略(如轮询、随机、权重等)从这些实例中选择一个进行请求转发。提示信息可以被视为一种配置或元数据,帮助负载均衡器更智能地进行选择。

例如,提示信息可以包含如下内容:

  • 环境标签:如果服务实例分布在不同的环境(如开发、测试、生产)中,提示信息可以用来指定应该选择哪个环境的实例。
  • 版本号:在进行蓝绿部署或灰度发布时,提示信息可以用来选择特定版本的服务实例。
  • 地理位置:对于分布在不同地理位置的服务实例,提示信息可以用来优先选择离用户最近的实例,以减少延迟。
  • 自定义策略:根据特定的业务需求,提示信息还可以用来实现自定义的选择策略,如基于请求参数、用户特征等进行智能路由。

在代码中,提示信息通常以键值对的形式存在,负载均衡器在选择服务实例时会参考这些信息。例如,Spring Cloud中的LoadBalancerClient接口允许通过Request对象提供的属性或元数据来传递提示信息,负载均衡策略实现可以读取这些信息来影响实例选择的过程。

LoadBalancerLifecycle 接口

LoadBalancerLifecycle是Spring Cloud LoadBalancer中的一个接口,它定义了在负载均衡过程中的不同阶段可以执行的回调方法。通过实现这个接口,可以在请求的生命周期中的关键时刻(如请求开始、完成或失败等)执行自定义逻辑,从而增加了对负载均衡过程的可观察性和可控制性。

LoadBalancerLifecycle接口主要包含以下方法:

  1. onStart(Request request, Response response): 当负载均衡器开始处理请求时调用。可以用于记录日志、收集统计信息或执行其他初始化逻辑。
  2. onStartRequest(Request request, Response response): 在发送请求到实际服务实例之前调用。与onStart相比,它更接近于请求被发送的时刻。
  3. onComplete(CompletionContext completionContext): 当请求完成时调用,无论请求是成功、失败还是被取消。CompletionContext对象提供了请求完成的详细信息,包括请求和响应的数据、完成的状态(成功、失败、取消)以及任何异常信息。这个方法可以用于执行清理工作、记录请求结果、更新统计信息等。

通过实现这些方法,开发者可以在负载均衡过程中插入自定义逻辑,实现如下功能:

  • 日志记录和监控:在请求的不同阶段记录详细日志,收集性能指标和统计信息,帮助监控负载均衡器的行为和服务实例的健康状况。
  • 故障处理:在请求失败时执行自定义的故障恢复逻辑,例如重试、回退到备用实例或记录错误信息。
  • 请求追踪:在请求的生命周期中添加追踪信息,与分布式追踪系统集成,提高系统的可观察性。

LoadBalancerLifecycle提供的这些回调方法使得负载均衡逻辑不仅限于选择服务实例,还可以包含更丰富的上下文处理和定制化逻辑,从而使得整个负载均衡过程更加灵活和强大。

通过 retrofit 拦截器集成 LoadBanlancer

这里使用 retrofit 提供的 Interceptor 功能,来实现 spring-cloud-loadbalancer 的整合。

Interceptor 在 Retrofit 库中是一个非常强大的机制,它允许你拦截并修改或重写请求和响应的方式。这在很多场景中都非常有用,比如添加通用的请求头(例如认证信息)、日志记录、请求重试等等。Retrofit 本身是基于 OkHttp 库构建的,因此 Retrofit 的 Interceptor 实质上就是使用 OkHttp 的拦截器。

要创建一个 Interceptor,你需要实现 OkHttp 的 Interceptor 接口,并重写其中的 intercept 方法。在这个方法里,你可以获取到原始的请求信息,进行修改后再继续执行请求,或者直接构造并返回一个响应。

import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;

public class MyInterceptor implements Interceptor {

    @Override
    public Response intercept(Chain chain) throws IOException {
        Request originalRequest = chain.request();

        // 在原始请求上添加新的请求头
        Request newRequest = originalRequest.newBuilder()
                .header("Authorization", "Bearer your_token_here")
                .build();

        return chain.proceed(newRequest);
    }
}

创建了自定义的 Interceptor 后,需要将其添加到 Retrofit 使用的 OkHttp 客户端中。这样,每次通过 Retrofit 发起请求时,都会通过这个拦截器进行处理。

OkHttpClient okHttpClient = new OkHttpClient.Builder()
        .addInterceptor(new MyInterceptor())  // 添加自定义拦截器
        .build();

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("https://api.example.com")
        .client(okHttpClient)
        .build();

OkHttp 提供了两种类型的拦截器:

  1. 应用级别拦截器 (Application Interceptors):通过调用 addInterceptor() 方法添加。它们不关心 OkHttp 注入的头部像 If-None-Match,并且允许重试和多次响应转换。
  2. 网络级别拦截器 (Network Interceptors):通过调用 addNetworkInterceptor() 方法添加。它们能够操作重定向和重试等操作,并且能够看到数据被传输过程中自动添加的头部。

创建 RetrofitLoadBalancerInterceptor 并实现 ServiceChooseInterceptor

ServiceChooseInterceptorretrofit-spring-boot-starter 提供的一个负载均衡实现父类,根据官方的文档,只需要实现ServiceInstanceChooser接口就可以了,这里是覆盖了 ServiceChooseInterceptor 的实现类的做法。

package xyz.tiegangan.tools.common.retrofit.core.loadbalancer.interceptor;

/**
 * 负载均衡器拦截器
 *
 * @author huangmuhong
 * @version 1.0.0
 * @date 2024/01/19
 * @see GlobalInterceptor
 */
@Slf4j
public class RetrofitLoadBalancerInterceptor extends ServiceChooseInterceptor {

    private final LoadBalancerClient loadBalancer;

    private final LoadBalancerClientFactory loadBalancerClientFactory;

    public RetrofitLoadBalancerInterceptor(LoadBalancerClient loadBalancer,
        LoadBalancerClientFactory loadBalancerClientFactory) {
        super(null);
        this.loadBalancer = loadBalancer;
        this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @NotNull
    @Override
    @SuppressWarnings("all")
    public Response intercept(@NotNull Chain chain) throws IOException {

        Request request = chain.request();
        Method method = RetrofitUtils.getMethodFormRequest(request);
        if (method == null) {
            return chain.proceed(request);
        }
        Class<?> declaringClass = method.getDeclaringClass();
        RetrofitClient retrofitClient =
            AnnotatedElementUtils.findMergedAnnotation(declaringClass, RetrofitClient.class);
        String baseUrl = retrofitClient.baseUrl();
        if (StringUtils.hasText(baseUrl)) {
            return chain.proceed(request);
        }
        // serviceId服务发现
        String serviceId = retrofitClient.serviceId();
        final String hint = getHint(serviceId);
        DefaultRequest<RequestDataContext> lbRequest =
            new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors =
            LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
                loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
        ServiceInstance instance = loadBalancer.choose(serviceId, lbRequest);
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse =
            new DefaultResponse(instance);
        if (instance == null) {
            String message = "负载均衡器不包含一个服务实例: " + serviceId;
            log.warn(message);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                    CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
            return new Response.Builder().request(request).code(HttpStatus.SERVICE_UNAVAILABLE.value()).message(message)
                .build();
        }

        String reconstructedUrl = loadBalancer.reconstructURI(instance, request.url().uri()).toString();
        final Request newRequest = buildRequest(request, reconstructedUrl);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
        try {
            Response response = chain.proceed(newRequest);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, lbResponse,
                    buildResponseData(response))));
            return response;
        } catch (Exception exception) {
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));

            throw exception;
        }
    }

    /**
     * 构建请求
     *
     * @param request          要求
     * @param reconstructedUrl 重建网址
     * @return {@link Request }
     * @author huangmuhong
     * @date 2024/01/19
     * @since 1.0.0
     */
    private Request buildRequest(Request request, String reconstructedUrl) {
        Request.Builder builder = request.newBuilder();
        builder.url(reconstructedUrl);
        return builder.build();
    }

    /**
     * 构建响应数据
     *
     * @param response 回复
     * @return {@link ResponseData }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected ResponseData buildResponseData(Response response) {
        HttpHeaders responseHeaders = new HttpHeaders();
        response.headers()
            .forEach((pair) -> responseHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
        return new ResponseData(HttpStatusCode.valueOf(response.code()), responseHeaders, null,
            buildRequestData(response.request()));
    }

    /**
     * 构建请求数据
     *
     * @param request 要求
     * @return {@link RequestData }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected RequestData buildRequestData(Request request) {
        HttpHeaders requestHeaders = new HttpHeaders();
        final Headers headers = request.headers();
        headers.forEach(pair -> requestHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
        return new RequestData(HttpMethod.valueOf(request.method()), request.url().uri(), requestHeaders, null,
            new HashMap<>());
    }

    /**
     * 获得提示
     *
     * @param serviceId 服务id
     * @return {@link String }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected String getHint(String serviceId) {
        LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
        String defaultHint = properties.getHint().getOrDefault("default", "default");
        String hintPropertyValue = properties.getHint().get(serviceId);
        return hintPropertyValue != null ? hintPropertyValue : defaultHint;
    }
}

通过配置类的方式,覆盖ServiceChooseInterceptor的bean

/**
     * 负载均衡器拦截器
     *
     * @param factory                   工厂
     * @param loadBalancerClientFactory 负载均衡器客户端工厂
     * @return {@link RetrofitLoadBalancerInterceptor }
     * @author huangmuhong
     * @date 2024/01/19
     * @since 1.0.0
     */
    @Bean
    @ConditionalOnLoadBalancer
    public ServiceChooseInterceptor retrofitLoadBalancerInterceptor(LoadBalancerClient factory,
        LoadBalancerClientFactory loadBalancerClientFactory) {
        return new RetrofitLoadBalancerInterceptor(factory, loadBalancerClientFactory);
    }

至此,retrofit对LoadBalancer的集成就实现了!