Retrofit 是一个类型安全的 HTTP 客户端,用于 Android 和 Java 应用程序。它是由 Square 公司开发的。Retrofit 让网络请求的实现变得非常简单和高效,通过将 REST API 调用转换为 Java 接口中的方法调用来工作。这样,开发者可以将注意力集中在与服务器交互的数据上,而不是处理底层的 HTTP 通信细节。
Retrofit 提供了一系列强大的功能,包括:
- 类型安全:通过使用注解来描述 HTTP 请求(如 GET、POST、PUT、DELETE 等),并将这些请求映射到 Java 接口中的方法上,可以减少运行时错误。
- 简化数据解析:Retrofit 可以配合 Gson 等库自动地将 JSON 或其他格式的响应数据序列化成 Java 对象。也就是说,你不需要手动解析网络响应;Retrofit 会为你处理这一切。
- 异步和同步方式:支持异步和同步网络请求,使其更加灵活地适应不同的使用场景。
- 可定制性:Retrofit 允许你通过 OkHttp 拦截器自定义 HTTP 请求和响应过程,例如添加通用头信息、监控请求时间或管理 Cookies 等。
- 多种数据格式支持:默认支持 Gson 库进行 JSON 解析,但也可通过转换器(Converter)轻松切换到其他如 Moshi、Jackson 或 Protobuf 等格式。
- 文件上传和下载:支持多种方式上传文件(如表单上传、二进制流)以及高效处理文件下载。
- 如何使用
spring boot 可以直接使用 lianjiatech
大佬提供的 LianjiaTech/retrofit-spring-boot-starter: A spring-boot starter for retrofit, supports rapid integration and feature enhancements.(适用于 retrofit 的 spring-boot-starter,支持快速集成和功能增强) (github.com)
以下的集成配置都是基于 retrofit-spring-boot-starter
的
要在项目中使用 Retrofit, 你首先需要在 pom.xml 文件中添加依赖:
<dependency>
<groupId>com.github.lianjiatech</groupId>
<artifactId>retrofit-spring-boot-starter</artifactId>
<version>latest</version>
</dependency>
之后就可以创建一个接口,并使用注解描述 API 端点了:
public interface MyApiService {
@GET("users/{user}/repos")
Call<List<Repo>> listRepos(@Path("user") String user);
}
最后,在目标类中注入对象,并直接调用目标方法即可:
@Autowird
private MyApiService myApiService;
myApiService.listRepos("name");
Spring Cloud Loadbalancer 是什么?
Spring Cloud LoadBalancer 是 Spring Cloud 的一个模块,它提供了一个客户端负载均衡的解决方案。在微服务架构中,服务之间的通信是不可避免的,而且常常需要通过网络调用远程服务。随着服务实例数量的增加,如何有效地分配请求以避免某些实例过载而其他实例空闲,成为了一个需要解决的问题。这就是负载均衡要解决的问题。Spring Cloud LoadBalancer 提供了一种简单有效的方式来在客户端进行负载均衡。
主要特性包括以下几点:
- 客户端负载均衡:与传统的服务端负载均衡相比(例如:Nginx),客户端负载均衡将决策权交给了客户端。这样可以根据实际情况(如响应时间、错误率等)动态地选择最佳的服务器实例。
- 与 Spring 生态系统集成:作为 Spring Cloud 项目的一部分,它与 Spring Boot、Spring WebFlux 等框架无缝集成,使得开发者可以轻松地在 Spring 应用中使用它。
- 灵活性和扩展性:提供多种负载均衡策略(如轮询、随机等),并且支持自定义策略。此外,它还支持对健康检查、缓存、重试等高级功能进行配置和扩展。
- 替代 Netflix Ribbon:随着 Netflix Ribbon 进入维护模式,Spring Cloud LoadBalancer 被设计为 Ribbon 的替代品,提供更现代化和可维护性更强的方案。
整合步骤
OpenFeign 实现 LoadBalancer 的思路
这里我们借鉴 OpenFeign
的实现思路,通过 LoadBalancerClient
和 LoadBalancerClientFactory
来整合 Loadbanlancer 并实现 RPC,以下是 OpenFeign
的源码介绍。
我们通过 OkHttpFeignLoadBalancerConfiguration
自动配置类中,可以发现 Feign 的负载均衡客户端是通过 FeignBlockingLoadBalancerClient
来创建的,我们查看 FeignBlockingLoadBalancerClient
的源码。
- FeignBlockingLoadBalancerClient
package org.springframework.cloud.openfeign.loadbalancer;
@SuppressWarnings({ "unchecked", "rawtypes" })
public class FeignBlockingLoadBalancerClient implements Client {
private static final Log LOG = LogFactory.getLog(FeignBlockingLoadBalancerClient.class);
private final Client delegate;
private final LoadBalancerClient loadBalancerClient;
private final LoadBalancerClientFactory loadBalancerClientFactory;
private final List<LoadBalancerFeignRequestTransformer> transformers;
@Override
public Response execute(Request request, Request.Options options) throws IOException {
// 寻找服务id
final URI originalUri = URI.create(request.url());
String serviceId = originalUri.getHost();
Assert.state(serviceId != null, "Request URI does not contain a valid hostname: " + originalUri);
// 获取负载均衡的提示信息
String hint = getHint(serviceId);
// 创建负载均衡请求,并获取相关的生命周期集合,并将状态设置为开始
DefaultRequest<RequestDataContext> lbRequest = new DefaultRequest<>(
new RequestDataContext(buildRequestData(request), hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors = LoadBalancerLifecycleValidator
.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
// 通过负载均衡客户端,获取到服务实例
ServiceInstance instance = loadBalancerClient.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse = new DefaultResponse(
instance);
// 如果没有获取到服务实例的处理逻辑
if (instance == null) {
String message = "Load balancer does not contain an instance for the service " + serviceId;
if (LOG.isWarnEnabled()) {
LOG.warn(message);
}
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle
.onComplete(new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return Response.builder().request(request).status(HttpStatus.SERVICE_UNAVAILABLE.value())
.body(message, StandardCharsets.UTF_8).build();
}
// 重构原始url,将host:port替换为负载均衡实例得到的地址
String reconstructedUrl = loadBalancerClient.reconstructURI(instance, originalUri).toString();
// 根据新的url,构建新的Feign Request对象
Request newRequest = buildRequest(request, reconstructedUrl, instance);
return executeWithLoadBalancerLifecycleProcessing(delegate, options, newRequest, lbRequest, lbResponse,
supportedLifecycleProcessors);
}
protected Request buildRequest(Request request, String reconstructedUrl) {
// 根据重构的URL创建新的请求对象
return Request.create(request.httpMethod(), reconstructedUrl, request.headers(), request.body(),
request.charset(), request.requestTemplate());
}
protected Request buildRequest(Request request, String reconstructedUrl, ServiceInstance instance) {
// 构建基础请求对象
Request newRequest = buildRequest(request, reconstructedUrl);
// 如果存在请求转换器,则对请求进行转换
if (transformers != null) {
for (LoadBalancerFeignRequestTransformer transformer : transformers) {
newRequest = transformer.transformRequest(newRequest, instance);
}
}
return newRequest;
}
// Visible for Sleuth instrumentation
public Client getDelegate() {
return delegate;
}
private String getHint(String serviceId) {
// 加载负载均衡器属性
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
// 获取默认提示信息
String defaultHint = properties.getHint().getOrDefault("default", "default");
// 获取服务ID对应的提示信息
String hintPropertyValue = properties.getHint().get(serviceId);
// 如果服务ID对应的提示信息存在,则使用该信息,否则使用默认提示信息
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
可以看到,在 execute
方法的最后,调用了 executeWithLoadBalancerLifecycleProcessing
方法完成具体的负载均衡生命周期请求,我们进入方法,发现最后由 LoadBalancerUtils
的 executeWithLoadBalancerLifecycleProcessing
方法完成处理,具体的逻辑如下。
static Response executeWithLoadBalancerLifecycleProcessing(
Client feignClient, // Feign客户端,用于执行实际的HTTP请求
Request.Options options, // 请求选项,包含连接超时和读取超时设置
Request feignRequest, // Feign请求对象,包含请求的URL、HTTP方法、请求头等信息
org.springframework.cloud.client.loadbalancer.Request lbRequest, // 负载均衡器请求对象,用于负载均衡过程中的上下文信息传递
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse, // 负载均衡器响应对象,包含被选中的服务实例
Set<LoadBalancerLifecycle> supportedLifecycleProcessors, // 支持的负载均衡生命周期处理器集合
boolean loadBalanced // 标志位,指示请求是否通过负载均衡器处理
) throws IOException {
// 在请求开始之前,通过所有支持的生命周期处理器执行onStartRequest方法
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
try {
// 使用Feign客户端执行实际的HTTP请求,并获取响应
Response response = feignClient.execute(feignRequest, options);
if (loadBalanced) {
// 如果请求通过负载均衡器处理,请求成功完成后,通过所有支持的生命周期处理器执行onComplete方法
// 并将状态设置为SUCCESS
supportedLifecycleProcessors.forEach(
lifecycle -> lifecycle.onComplete(new CompletionContext<>(CompletionContext.Status.SUCCESS,
lbRequest, lbResponse, buildResponseData(response))));
}
// 返回Feign客户端得到的响应
return response;
}
catch (Exception exception) {
// 如果在请求过程中发生异常
if (loadBalanced) {
// 如果请求通过负载均衡器处理,通过所有支持的生命周期处理器执行onComplete方法
// 并将状态设置为FAILED,同时传递异常信息
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
}
// 将异常抛出,以便调用方可以处理
throw exception;
}
}
负载均衡的 Hint 信息
负载均衡器的提示信息(Hint)是用于指导负载均衡器如何选择服务实例的附加信息。在微服务架构中,一个服务可能有多个实例运行在不同的服务器或环境中。负载均衡器负责根据一定的策略(如轮询、随机、权重等)从这些实例中选择一个进行请求转发。提示信息可以被视为一种配置或元数据,帮助负载均衡器更智能地进行选择。
例如,提示信息可以包含如下内容:
- 环境标签:如果服务实例分布在不同的环境(如开发、测试、生产)中,提示信息可以用来指定应该选择哪个环境的实例。
- 版本号:在进行蓝绿部署或灰度发布时,提示信息可以用来选择特定版本的服务实例。
- 地理位置:对于分布在不同地理位置的服务实例,提示信息可以用来优先选择离用户最近的实例,以减少延迟。
- 自定义策略:根据特定的业务需求,提示信息还可以用来实现自定义的选择策略,如基于请求参数、用户特征等进行智能路由。
在代码中,提示信息通常以键值对的形式存在,负载均衡器在选择服务实例时会参考这些信息。例如,Spring Cloud 中的 LoadBalancerClient
接口允许通过 Request
对象提供的属性或元数据来传递提示信息,负载均衡策略实现可以读取这些信息来影响实例选择的过程。
LoadBalancerLifecycle 接口
LoadBalancerLifecycle
是 Spring Cloud LoadBalancer 中的一个接口,它定义了在负载均衡过程中的不同阶段可以执行的回调方法。通过实现这个接口,可以在请求的生命周期中的关键时刻(如请求开始、完成或失败等)执行自定义逻辑,从而增加了对负载均衡过程的可观察性和可控制性。
LoadBalancerLifecycle
接口主要包含以下方法:
- onStart(Request request, Response response): 当负载均衡器开始处理请求时调用。可以用于记录日志、收集统计信息或执行其他初始化逻辑。
- onStartRequest(Request request, Response response): 在发送请求到实际服务实例之前调用。与
onStart
相比,它更接近于请求被发送的时刻。 - onComplete(CompletionContext completionContext): 当请求完成时调用,无论请求是成功、失败还是被取消。
CompletionContext
对象提供了请求完成的详细信息,包括请求和响应的数据、完成的状态(成功、失败、取消)以及任何异常信息。这个方法可以用于执行清理工作、记录请求结果、更新统计信息等。
通过实现这些方法,开发者可以在负载均衡过程中插入自定义逻辑,实现如下功能:
- 日志记录和监控:在请求的不同阶段记录详细日志,收集性能指标和统计信息,帮助监控负载均衡器的行为和服务实例的健康状况。
- 故障处理:在请求失败时执行自定义的故障恢复逻辑,例如重试、回退到备用实例或记录错误信息。
- 请求追踪:在请求的生命周期中添加追踪信息,与分布式追踪系统集成,提高系统的可观察性。
LoadBalancerLifecycle
提供的这些回调方法使得负载均衡逻辑不仅限于选择服务实例,还可以包含更丰富的上下文处理和定制化逻辑,从而使得整个负载均衡过程更加灵活和强大。
通过 retrofit
拦截器集成 LoadBanlancer
这里使用 retrofit
提供的 Interceptor
功能,来实现 spring-cloud-loadbalancer 的整合。
Interceptor
在 Retrofit 库中是一个非常强大的机制,它允许你拦截并修改或重写请求和响应的方式。这在很多场景中都非常有用,比如添加通用的请求头(例如认证信息)、日志记录、请求重试等等。Retrofit 本身是基于 OkHttp 库构建的,因此 Retrofit 的 Interceptor
实质上就是使用 OkHttp 的拦截器。
要创建一个 Interceptor
,你需要实现 OkHttp 的 Interceptor
接口,并重写其中的 intercept
方法。在这个方法里,你可以获取到原始的请求信息,进行修改后再继续执行请求,或者直接构造并返回一个响应。
import okhttp3.Interceptor;
import okhttp3.Request;
import okhttp3.Response;
import java.io.IOException;
public class MyInterceptor implements Interceptor {
@Override
public Response intercept(Chain chain) throws IOException {
Request originalRequest = chain.request();
// 在原始请求上添加新的请求头
Request newRequest = originalRequest.newBuilder()
.header("Authorization", "Bearer your_token_here")
.build();
return chain.proceed(newRequest);
}
}
创建了自定义的 Interceptor
后,需要将其添加到 Retrofit 使用的 OkHttp 客户端中。这样,每次通过 Retrofit 发起请求时,都会通过这个拦截器进行处理。
OkHttpClient okHttpClient = new OkHttpClient.Builder()
.addInterceptor(new MyInterceptor()) // 添加自定义拦截器
.build();
Retrofit retrofit = new Retrofit.Builder()
.baseUrl("https://api.example.com")
.client(okHttpClient)
.build();
OkHttp 提供了两种类型的拦截器:
- 应用级别拦截器 (Application Interceptors):通过调用
addInterceptor()
方法添加。它们不关心 OkHttp 注入的头部像 If-None-Match,并且允许重试和多次响应转换。 - 网络级别拦截器 (Network Interceptors):通过调用
addNetworkInterceptor()
方法添加。它们能够操作重定向和重试等操作,并且能够看到数据被传输过程中自动添加的头部。
创建 RetrofitLoadBalancerInterceptor
并实现 ServiceChooseInterceptor
ServiceChooseInterceptor
是 retrofit-spring-boot-starter
提供的一个负载均衡实现父类,根据官方的文档,只需要实现 ServiceInstanceChooser
接口就可以了,这里是覆盖了 ServiceChooseInterceptor
的实现类的做法。
package xyz.tiegangan.tools.common.retrofit.core.loadbalancer.interceptor;
/**
* 负载均衡器拦截器
*
* @author huangmuhong
* @version 1.0.0
* @date 2024/01/19
* @see GlobalInterceptor
*/
@Slf4j
public class RetrofitLoadBalancerInterceptor extends ServiceChooseInterceptor {
private final LoadBalancerClient loadBalancer;
private final LoadBalancerClientFactory loadBalancerClientFactory;
public RetrofitLoadBalancerInterceptor(LoadBalancerClient loadBalancer,
LoadBalancerClientFactory loadBalancerClientFactory) {
super(null);
this.loadBalancer = loadBalancer;
this.loadBalancerClientFactory = loadBalancerClientFactory;
}
@NotNull
@Override
@SuppressWarnings("all")
public Response intercept(@NotNull Chain chain) throws IOException {
Request request = chain.request();
Method method = RetrofitUtils.getMethodFormRequest(request);
if (method == null) {
return chain.proceed(request);
}
Class<?> declaringClass = method.getDeclaringClass();
RetrofitClient retrofitClient =
AnnotatedElementUtils.findMergedAnnotation(declaringClass, RetrofitClient.class);
String baseUrl = retrofitClient.baseUrl();
if (StringUtils.hasText(baseUrl)) {
return chain.proceed(request);
}
// serviceId服务发现
String serviceId = retrofitClient.serviceId();
final String hint = getHint(serviceId);
DefaultRequest<RequestDataContext> lbRequest =
new DefaultRequest<>(new RequestDataContext(buildRequestData(request), hint));
Set<LoadBalancerLifecycle> supportedLifecycleProcessors =
LoadBalancerLifecycleValidator.getSupportedLifecycleProcessors(
loadBalancerClientFactory.getInstances(serviceId, LoadBalancerLifecycle.class),
RequestDataContext.class, ResponseData.class, ServiceInstance.class);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStart(lbRequest));
ServiceInstance instance = loadBalancer.choose(serviceId, lbRequest);
org.springframework.cloud.client.loadbalancer.Response<ServiceInstance> lbResponse =
new DefaultResponse(instance);
if (instance == null) {
String message = "负载均衡器不包含一个服务实例: " + serviceId;
log.warn(message);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<ResponseData, ServiceInstance, RequestDataContext>(
CompletionContext.Status.DISCARD, lbRequest, lbResponse)));
return new Response.Builder().request(request).code(HttpStatus.SERVICE_UNAVAILABLE.value()).message(message)
.build();
}
String reconstructedUrl = loadBalancer.reconstructURI(instance, request.url().uri()).toString();
final Request newRequest = buildRequest(request, reconstructedUrl);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onStartRequest(lbRequest, lbResponse));
try {
Response response = chain.proceed(newRequest);
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.SUCCESS, lbRequest, lbResponse,
buildResponseData(response))));
return response;
} catch (Exception exception) {
supportedLifecycleProcessors.forEach(lifecycle -> lifecycle.onComplete(
new CompletionContext<>(CompletionContext.Status.FAILED, exception, lbRequest, lbResponse)));
throw exception;
}
}
/**
* 构建请求
*
* @param request 要求
* @param reconstructedUrl 重建网址
* @return {@link Request }
* @author huangmuhong
* @date 2024/01/19
* @since 1.0.0
*/
private Request buildRequest(Request request, String reconstructedUrl) {
Request.Builder builder = request.newBuilder();
builder.url(reconstructedUrl);
return builder.build();
}
/**
* 构建响应数据
*
* @param response 回复
* @return {@link ResponseData }
* @author huangmuhong
* @date 2023/11/03
* @since 1.0.0
*/
protected ResponseData buildResponseData(Response response) {
HttpHeaders responseHeaders = new HttpHeaders();
response.headers()
.forEach((pair) -> responseHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
return new ResponseData(HttpStatusCode.valueOf(response.code()), responseHeaders, null,
buildRequestData(response.request()));
}
/**
* 构建请求数据
*
* @param request 要求
* @return {@link RequestData }
* @author huangmuhong
* @date 2023/11/03
* @since 1.0.0
*/
protected RequestData buildRequestData(Request request) {
HttpHeaders requestHeaders = new HttpHeaders();
final Headers headers = request.headers();
headers.forEach(pair -> requestHeaders.put(pair.getFirst(), Lists.newArrayList(pair.getSecond())));
return new RequestData(HttpMethod.valueOf(request.method()), request.url().uri(), requestHeaders, null,
new HashMap<>());
}
/**
* 获得提示
*
* @param serviceId 服务id
* @return {@link String }
* @author huangmuhong
* @date 2023/11/03
* @since 1.0.0
*/
protected String getHint(String serviceId) {
LoadBalancerProperties properties = loadBalancerClientFactory.getProperties(serviceId);
String defaultHint = properties.getHint().getOrDefault("default", "default");
String hintPropertyValue = properties.getHint().get(serviceId);
return hintPropertyValue != null ? hintPropertyValue : defaultHint;
}
}
通过配置类的方式,覆盖 ServiceChooseInterceptor
的 bean
/**
* 负载均衡器拦截器
*
* @param factory 工厂
* @param loadBalancerClientFactory 负载均衡器客户端工厂
* @return {@link RetrofitLoadBalancerInterceptor }
* @author huangmuhong
* @date 2024/01/19
* @since 1.0.0
*/
@Bean
@ConditionalOnLoadBalancer
public ServiceChooseInterceptor retrofitLoadBalancerInterceptor(LoadBalancerClient factory,
LoadBalancerClientFactory loadBalancerClientFactory) {
return new RetrofitLoadBalancerInterceptor(factory, loadBalancerClientFactory);
}
至此,retrofit 对 LoadBalancer 的集成就实现了!
评论区