侧边栏壁纸
博主头像
XiaoLin's Blog 博主等级

XiaoLin的个人博客~

  • 累计撰写 33 篇文章
  • 累计创建 33 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

retrofit 添加对 SpringCloud LoadBalancer 的支特

XiaoLin
2024-02-27 / 0 评论 / 0 点赞 / 241 阅读 / 0 字
温馨提示:
本文最后更新于2024-03-07,若内容或图片失效,请留言反馈。 部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

Retrofit 是一个类型安全的 HTTP 客户端,用于 Android 和 Java 应用程序。它是由 Square 公司开发的。Retrofit 让网络请求的实现变得非常简单和高效,通过将 REST API 调用转换为 Java 接口中的方法调用来工作。这样,开发者可以将注意力集中在与服务器交互的数据上,而不是处理底层的 HTTP 通信细节。

Retrofit 提供了一系列强大的功能,包括:

  1. 类型安全:通过使用注解来描述 HTTP 请求(如 GET、POST、PUT、DELETE 等),并将这些请求映射到 Java 接口中的方法上,可以减少运行时错误。
  2. 简化数据解析:Retrofit 可以配合 Gson 等库自动地将 JSON 或其他格式的响应数据序列化成 Java 对象。也就是说,你不需要手动解析网络响应;Retrofit 会为你处理这一切。
  3. 异步和同步方式:支持异步和同步网络请求,使其更加灵活地适应不同的使用场景。
  4. 可定制性:Retrofit 允许你通过 OkHttp 拦截器自定义 HTTP 请求和响应过程,例如添加通用头信息、监控请求时间或管理 Cookies 等。
  5. 多种数据格式支持:默认支持 Gson 库进行 JSON 解析,但也可通过转换器(Converter)轻松切换到其他如 Moshi、Jackson 或 Protobuf 等格式。
  6. 文件上传和下载:支持多种方式上传文件(如表单上传、二进制流)以及高效处理文件下载。
  • 如何使用

spring boot 可以直接使用 lianjiatech 大佬提供的 LianjiaTech/retrofit-spring-boot-starter: A spring-boot starter for retrofit, supports rapid integration and feature enhancements.(适用于 retrofit 的 spring-boot-starter,支持快速集成和功能增强) (github.com)
以下的集成配置都是基于 retrofit-spring-boot-starter

要在项目中使用 Retrofit, 你首先需要在 pom.xml 文件中添加依赖:

<dependency>  
  <groupId>com.github.lianjiatech</groupId>  
  <artifactId>retrofit-spring-boot-starter</artifactId>
  <version>latest</version>
</dependency>

之后就可以创建一个接口,并使用注解描述 API 端点了:

public interface MyApiService {
  @GET("users/{user}/repos")
  Call<List<Repo>> listRepos(@Path("user") String user);
}

最后,在目标类中注入对象,并直接调用目标方法即可:

@Autowird
private MyApiService myApiService;

myApiService.listRepos("name");

Spring Cloud Loadbalancer 是什么?

Spring Cloud LoadBalancer 是 Spring Cloud 的一个模块,它提供了一个客户端负载均衡的解决方案。在微服务架构中,服务之间的通信是不可避免的,而且常常需要通过网络调用远程服务。随着服务实例数量的增加,如何有效地分配请求以避免某些实例过载而其他实例空闲,成为了一个需要解决的问题。这就是负载均衡要解决的问题。Spring Cloud LoadBalancer 提供了一种简单有效的方式来在客户端进行负载均衡。

主要特性包括以下几点:

  1. 客户端负载均衡:与传统的服务端负载均衡相比(例如:Nginx),客户端负载均衡将决策权交给了客户端。这样可以根据实际情况(如响应时间、错误率等)动态地选择最佳的服务器实例。
  2. 与 Spring 生态系统集成:作为 Spring Cloud 项目的一部分,它与 Spring Boot、Spring WebFlux 等框架无缝集成,使得开发者可以轻松地在 Spring 应用中使用它。
  3. 灵活性和扩展性:提供多种负载均衡策略(如轮询、随机等),并且支持自定义策略。此外,它还支持对健康检查、缓存、重试等高级功能进行配置和扩展。
  4. 替代 Netflix Ribbon:随着 Netflix Ribbon 进入维护模式,Spring Cloud LoadBalancer 被设计为 Ribbon 的替代品,提供更现代化和可维护性更强的方案。

整合步骤

OpenFeign 实现 LoadBalancer 的思路

这里我们借鉴 OpenFeign 的实现思路,通过 LoadBalancerClientLoadBalancerClientFactory 来整合 Loadbanlancer 并实现 RPC,以下是 OpenFeign 的源码介绍。

我们通过 OkHttpFeignLoadBalancerConfiguration 自动配置类中,可以发现 Feign 的负载均衡客户端是通过 FeignBlockingLoadBalancerClient 来创建的,我们查看 FeignBlockingLoadBalancerClient 的源码。

  • FeignBlockingLoadBalancerClient
package org.springframework.cloud.openfeign.loadbalancer;

@SuppressWarnings({ "unchecked", "rawtypes" })
public class FeignBlockingLoadBalancerClient implements Client {

	private static final Log LOG = LogFactory.getLog(FeignBlockingLoadBalancerClient.class);

	private final Client delegate;

	private final LoadBalancerClient loadBalancerClient;

	private final LoadBalancerClientFactory loadBalancerClientFactory;

	private final List<LoadBalancerFeignRequestTransformer> transformers;
	
	@Override
	public Response execute(Request request, Request.Options options) throws IOException {
    	// 寻找服务id
		final URI originalUri = URI.create(request.url());
		String serviceId = originalUri.getHost();
		Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);

        // 获取负载均衡的提示信息
		String hint = getHint(serviceId);

        // 创建负载均衡请求,并获取相关的生命周期集合,并将状态设置为开始
		DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
				new RequestDataContext(buildRequestData(request), hint));
		Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
				.getSupportedLifecycleProcessors(
						loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
						RequestDataContext.class, ResponseData.class, ServiceInstance.class);
		supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));

        // 通过负载均衡客户端,获取到服务实例
		ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
		org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
				instance);

        // 如果没有获取到服务实例的处理逻辑
		if (instance == null) {
			String message = "Load balancer does not contain an instance for the service " + serviceId;
			if (LOG.isWarnEnabled()) {
				LOG.warn(message);
			}
			supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
					.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
							CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
			return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
					.body(message, StandardCharsets.UTF_8).build();
		}

        // 重构原始url,将host:port替换为负载均衡实例得到的地址
		String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();

        // 根据新的url,构建新的Feign Request对象
		Request newRequest = buildRequest(request, reconstructedUrl, instance);
		return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
				supportedLifecycleProcessors);
	}

	protected Request buildRequest(Request request, String reconstructedUrl) {
    	// 根据重构的URL创建新的请求对象
		return Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(),
				request.charset(), request.requestTemplate());
	}

	protected Request buildRequest(Request request, String reconstructedUrl, ServiceInstance instance) {
    	// 构建基础请求对象
		Request newRequest = buildRequest(request, reconstructedUrl);
		// 如果存在请求转换器,则对请求进行转换
		if (transformers != null) {
			for (LoadBalancerFeignRequestTransformer transformer : transformers) {
				newRequest = transformer.transformRequest(newRequest, instance);
			}
		}
		return newRequest;
	}

	// Visible for Sleuth instrumentation
	public Client getDelegate() {
		return delegate;
	}

	private String getHint(String serviceId) {
        // 加载负载均衡器属性
        LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
        // 获取默认提示信息
        String defaultHint = properties.getHint().getOrDefault("default", "default");
        // 获取服务ID对应的提示信息
        String hintPropertyValue = properties.getHint().get(serviceId);
        // 如果服务ID对应的提示信息存在,则使用该信息,否则使用默认提示信息
        return hintPropertyValue != null ? hintPropertyValue : defaultHint;
	}
}

可以看到,在 execute 方法的最后,调用了 executeWithLoadBalancerLifecycleProcessing 方法完成具体的负载均衡生命周期请求,我们进入方法,发现最后由 LoadBalancerUtilsexecuteWithLoadBalancerLifecycleProcessing 方法完成处理,具体的逻辑如下。

static Response executeWithLoadBalancerLifecycleProcessing(
        Client feignClient, // Feign客户端,用于执行实际的HTTP请求
        Request.Options options, // 请求选项,包含连接超时和读取超时设置
        Request feignRequest, // Feign请求对象,包含请求的URL、HTTP方法、请求头等信息
        org.springframework.cloud.client.loadbalancer.Request lbRequest, // 负载均衡器请求对象,用于负载均衡过程中的上下文信息传递
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse, // 负载均衡器响应对象,包含被选中的服务实例
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors, // 支持的负载均衡生命周期处理器集合
        boolean loadBalanced // 标志位,指示请求是否通过负载均衡器处理
) throws IOException {
    // 在请求开始之前,通过所有支持的生命周期处理器执行onStartRequest方法
    supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
    try {
        // 使用Feign客户端执行实际的HTTP请求,并获取响应
        Response response = feignClient.execute(feignRequest, options);
        if (loadBalanced) {
            // 如果请求通过负载均衡器处理,请求成功完成后,通过所有支持的生命周期处理器执行onComplete方法
            // 并将状态设置为SUCCESS
            supportedLifecycleProcessors.forEach(
                    lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
                            lbRequest, lbResponse, buildResponseData(response))));
        }
        // 返回Feign客户端得到的响应
        return response;
    }
    catch (Exception exception) {
        // 如果在请求过程中发生异常
        if (loadBalanced) {
            // 如果请求通过负载均衡器处理,通过所有支持的生命周期处理器执行onComplete方法
            // 并将状态设置为FAILED,同时传递异常信息
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                    new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
        }
        // 将异常抛出,以便调用方可以处理
        throw exception;
    }
}

负载均衡的 Hint 信息

负载均衡器的提示信息(Hint)是用于指导负载均衡器如何选择服务实例的附加信息。在微服务架构中,一个服务可能有多个实例运行在不同的服务器或环境中。负载均衡器负责根据一定的策略(如轮询、随机、权重等)从这些实例中选择一个进行请求转发。提示信息可以被视为一种配置或元数据,帮助负载均衡器更智能地进行选择。

例如,提示信息可以包含如下内容:

  • 环境标签:如果服务实例分布在不同的环境(如开发、测试、生产)中,提示信息可以用来指定应该选择哪个环境的实例。
  • 版本号:在进行蓝绿部署或灰度发布时,提示信息可以用来选择特定版本的服务实例。
  • 地理位置:对于分布在不同地理位置的服务实例,提示信息可以用来优先选择离用户最近的实例,以减少延迟。
  • 自定义策略:根据特定的业务需求,提示信息还可以用来实现自定义的选择策略,如基于请求参数、用户特征等进行智能路由。

在代码中,提示信息通常以键值对的形式存在,负载均衡器在选择服务实例时会参考这些信息。例如,Spring Cloud 中的 LoadBalancerClient 接口允许通过 Request 对象提供的属性或元数据来传递提示信息,负载均衡策略实现可以读取这些信息来影响实例选择的过程。

LoadBalancerLifecycle 接口

LoadBalancerLifecycle 是 Spring Cloud LoadBalancer 中的一个接口,它定义了在负载均衡过程中的不同阶段可以执行的回调方法。通过实现这个接口,可以在请求的生命周期中的关键时刻(如请求开始、完成或失败等)执行自定义逻辑,从而增加了对负载均衡过程的可观察性和可控制性。

LoadBalancerLifecycle 接口主要包含以下方法:

  1. onStart(Request request, Response response): 当负载均衡器开始处理请求时调用。可以用于记录日志、收集统计信息或执行其他初始化逻辑。
  2. onStartRequest(Request request, Response response): 在发送请求到实际服务实例之前调用。与 onStart 相比,它更接近于请求被发送的时刻。
  3. onComplete(CompletionContext completionContext): 当请求完成时调用,无论请求是成功、失败还是被取消。CompletionContext 对象提供了请求完成的详细信息,包括请求和响应的数据、完成的状态(成功、失败、取消)以及任何异常信息。这个方法可以用于执行清理工作、记录请求结果、更新统计信息等。

通过实现这些方法,开发者可以在负载均衡过程中插入自定义逻辑,实现如下功能:

  • 日志记录和监控:在请求的不同阶段记录详细日志,收集性能指标和统计信息,帮助监控负载均衡器的行为和服务实例的健康状况。
  • 故障处理:在请求失败时执行自定义的故障恢复逻辑,例如重试、回退到备用实例或记录错误信息。
  • 请求追踪:在请求的生命周期中添加追踪信息,与分布式追踪系统集成,提高系统的可观察性。

LoadBalancerLifecycle 提供的这些回调方法使得负载均衡逻辑不仅限于选择服务实例,还可以包含更丰富的上下文处理和定制化逻辑,从而使得整个负载均衡过程更加灵活和强大。

通过 retrofit 拦截器集成 LoadBanlancer

这里使用 retrofit 提供的 Interceptor 功能,来实现 spring-cloud-loadbalancer 的整合。

Interceptor 在 Retrofit 库中是一个非常强大的机制,它允许你拦截并修改或重写请求和响应的方式。这在很多场景中都非常有用,比如添加通用的请求头(例如认证信息)、日志记录、请求重试等等。Retrofit 本身是基于 OkHttp 库构建的,因此 Retrofit 的 Interceptor 实质上就是使用 OkHttp 的拦截器。

要创建一个 Interceptor,你需要实现 OkHttp 的 Interceptor 接口,并重写其中的 intercept 方法。在这个方法里,你可以获取到原始的请求信息,进行修改后再继续执行请求,或者直接构造并返回一个响应。

import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;

import java.io.IOException;

public class MyInterceptor implements Interceptor {

    @Override
    public Response intercept(Chain chain) throws IOException {
        Request originalRequest = chain.request();

        // 在原始请求上添加新的请求头
        Request newRequest = originalRequest.newBuilder()
                .header("Authorization", "Bearer your_token_here")
                .build();

        return chain.proceed(newRequest);
    }
}

创建了自定义的 Interceptor 后,需要将其添加到 Retrofit 使用的 OkHttp 客户端中。这样,每次通过 Retrofit 发起请求时,都会通过这个拦截器进行处理。

OkHttpClient okHttpClient = new OkHttpClient.Builder()
        .addInterceptor(new MyInterceptor())  // 添加自定义拦截器
        .build();

Retrofit retrofit = new Retrofit.Builder()
        .baseUrl("https://api.example.com")
        .client(okHttpClient)
        .build();

OkHttp 提供了两种类型的拦截器:

  1. 应用级别拦截器 (Application Interceptors):通过调用 addInterceptor() 方法添加。它们不关心 OkHttp 注入的头部像 If-None-Match,并且允许重试和多次响应转换。
  2. 网络级别拦截器 (Network Interceptors):通过调用 addNetworkInterceptor() 方法添加。它们能够操作重定向和重试等操作,并且能够看到数据被传输过程中自动添加的头部。

创建 RetrofitLoadBalancerInterceptor 并实现 ServiceChooseInterceptor

ServiceChooseInterceptorretrofit-spring-boot-starter 提供的一个负载均衡实现父类,根据官方的文档,只需要实现 ServiceInstanceChooser 接口就可以了,这里是覆盖了 ServiceChooseInterceptor 的实现类的做法。

package xyz.tiegangan.tools.common.retrofit.core.loadbalancer.interceptor;

/**
 * 负载均衡器拦截器
 *
 * @author huangmuhong
 * @version 1.0.0
 * @date 2024/01/19
 * @see GlobalInterceptor
 */
@Slf4j
public class RetrofitLoadBalancerInterceptor extends ServiceChooseInterceptor {

    private final LoadBalancerClient loadBalancer;

    private final LoadBalancerClientFactory loadBalancerClientFactory;

    public RetrofitLoadBalancerInterceptor(LoadBalancerClient loadBalancer,
        LoadBalancerClientFactory loadBalancerClientFactory) {
        super(null);
        this.loadBalancer = loadBalancer;
        this.loadBalancerClientFactory = loadBalancerClientFactory;
    }

    @NotNull
    @Override
    @SuppressWarnings("all")
    public Response intercept(@NotNull Chain chain) throws IOException {

        Request request = chain.request();
        Method method = RetrofitUtils.getMethodFormRequest(request);
        if (method == null) {
            return chain.proceed(request);
        }
        Class<?> declaringClass = method.getDeclaringClass();
        RetrofitClient retrofitClient =
            AnnotatedElementUtils.findMergedAnnotation(declaringClass, RetrofitClient.class);
        String baseUrl = retrofitClient.baseUrl();
        if (StringUtils.hasText(baseUrl)) {
            return chain.proceed(request);
        }
        // serviceId服务发现
        String serviceId = retrofitClient.serviceId();
        final String hint = getHint(serviceId);
        DefaultRequest<RequestDataContext> lbRequest =
            new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));
        Set<LoadBalancerLifecycle> supportedLifecycleProcessors =
            LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
                loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
                RequestDataContext.class, ResponseData.class, ServiceInstance.class);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
        ServiceInstance instance = loadBalancer.choose(serviceId, lbRequest);
        org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse =
            new DefaultResponse(instance);
        if (instance == null) {
            String message = "负载均衡器不包含一个服务实例: " + serviceId;
            log.warn(message);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
                    CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
            return new Response.Builder().request(request).code(HttpStatus.SERVICE_UNAVAILABLE.value()).message(message)
                .build();
        }

        String reconstructedUrl = loadBalancer.reconstructURI(instance, request.url().uri()).toString();
        final Request newRequest = buildRequest(request, reconstructedUrl);
        supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
        try {
            Response response = chain.proceed(newRequest);
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, lbResponse,
                    buildResponseData(response))));
            return response;
        } catch (Exception exception) {
            supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
                new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));

            throw exception;
        }
    }

    /**
     * 构建请求
     *
     * @param request          要求
     * @param reconstructedUrl 重建网址
     * @return {@link Request }
     * @author huangmuhong
     * @date 2024/01/19
     * @since 1.0.0
     */
    private Request buildRequest(Request request, String reconstructedUrl) {
        Request.Builder builder = request.newBuilder();
        builder.url(reconstructedUrl);
        return builder.build();
    }

    /**
     * 构建响应数据
     *
     * @param response 回复
     * @return {@link ResponseData }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected ResponseData buildResponseData(Response response) {
        HttpHeaders responseHeaders = new HttpHeaders();
        response.headers()
            .forEach((pair) -> responseHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
        return new ResponseData(HttpStatusCode.valueOf(response.code()), responseHeaders, null,
            buildRequestData(response.request()));
    }

    /**
     * 构建请求数据
     *
     * @param request 要求
     * @return {@link RequestData }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected RequestData buildRequestData(Request request) {
        HttpHeaders requestHeaders = new HttpHeaders();
        final Headers headers = request.headers();
        headers.forEach(pair -> requestHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
        return new RequestData(HttpMethod.valueOf(request.method()), request.url().uri(), requestHeaders, null,
            new HashMap<>());
    }

    /**
     * 获得提示
     *
     * @param serviceId 服务id
     * @return {@link String }
     * @author huangmuhong
     * @date 2023/11/03
     * @since 1.0.0
     */
    protected String getHint(String serviceId) {
        LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
        String defaultHint = properties.getHint().getOrDefault("default", "default");
        String hintPropertyValue = properties.getHint().get(serviceId);
        return hintPropertyValue != null ? hintPropertyValue : defaultHint;
    }
}

通过配置类的方式,覆盖 ServiceChooseInterceptor 的 bean

/**
     * 负载均衡器拦截器
     *
     * @param factory                   工厂
     * @param loadBalancerClientFactory 负载均衡器客户端工厂
     * @return {@link RetrofitLoadBalancerInterceptor }
     * @author huangmuhong
     * @date 2024/01/19
     * @since 1.0.0
     */
    @Bean
    @ConditionalOnLoadBalancer
    public ServiceChooseInterceptor retrofitLoadBalancerInterceptor(LoadBalancerClient factory,
        LoadBalancerClientFactory loadBalancerClientFactory) {
        return new RetrofitLoadBalancerInterceptor(factory, loadBalancerClientFactory);
    }

至此,retrofit 对 LoadBalancer 的集成就实现了!

0

评论区