netty 与 websocket
Netty是一个网络应用框架,可以用来处理大量的客户端请求。它也可以用来开发高性能的、可扩展的网络应用服务器,例如WebSocket服务器。
WebSocket 是一种为双向通信开发的技术,允许浏览器和服务端进行全双工通信。 它是 HTML5 的一部分,它使得浏览器可以建立持久性连接,从而允许浏览器和服务端在各自之间进行实时通信。
将 Netty 与 WebSocket 结合起来,我们就可以利用 Netty 的处理能力(例如事件驱动、异步 I/O、高性能等特性)来开发出高性能的 WebSocket 服务端应用。Spring Boot 允许我们使用 Netty 来创建 WebSocket 服务端应用,并且它还内置了一些常用的 Netty 配置,因此我们不需要写大量的代码就能快速地开发出一个 WebSocket 服务端应用。
快速整合
Spring Boot 是一个用于创建独立的、基于生产级别的 Spring 应用程序的框架。它提供了自动配置、快速开发等特性,让开发人员能够更加便捷地构建应用程序。
Netty 是一个高性能的网络通信框架,它提供了异步的、事件驱动的网络编程模型。Netty 可以轻松地实现各种协议和传输方式,包括 HTTP、WebSocket 等。
在 Spring Boot 中整合 Netty WebSocket Server 的过程如下:
- 创建一个基于 Spring Boot 的 Maven 项目。
- 在 pom.xml 文件中添加 Netty 和 WebSocket 的依赖:
<dependencies>
<!-- Netty -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.63.Final</version>
</dependency>
<!-- WebSocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
</dependencies>
- 创建一个 NettyWebSocketServer 类,作为 WebSocket 服务器的启动类:
package xyz.tiegangan.chat.common.websocket;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.NettyRuntime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Value;
/**
* 基于 netty 的 websocket 服务端,独立于 webmvc 的端口
*
* @author huangmuhong
* @version 1.0.0
* @date 2023/07/02
*/
@Slf4j
@RequiredArgsConstructor
public class NettyWebSocketServer implements DisposableBean, SmartInitializingSingleton {
@Value("${server.websocket.port:8090}")
public int webSocketPort;
// 创建线程池执行器
private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private final EventLoopGroup workerGroup =
new NioEventLoopGroup(NettyRuntime.availableProcessors());
private final List<ChannelInboundHandler> channelInboundHandlerAdapters;
@Override
public void destroy() throws Exception {
bossGroup.shutdownGracefully().syncUninterruptibly();
workerGroup.shutdownGracefully().syncUninterruptibly();
log.info("NettyWebSocketServer destroy success");
}
@Override
public void afterSingletonsInstantiated() {
try {
run();
} catch (Throwable e) {
log.error("NettyWebSocketServer start error", e);
}
}
private void run() {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap
.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
.channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new NettyWebSocketServerInitializer());
}
});
}
}
- 创建一个 WebSocketServerInitializer 类,用于初始化 WebSocket 服务器:
package xyz.tiegangan.chat.common.websocket;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
/**
* netty websocket 链式处理器初始化类
*
* @author huangmuhong
* @version 1.0.0
* @date 2023/07/02
*/
public class NettyWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel
.pipeline()
// 30秒客户端没有向服务器发送心跳则关闭连接
.addLast(new IdleStateHandler(30, 0, 0))
// 因为使用http协议,所以需要使用http的编码器,解码器
.addLast(new HttpServerCodec())
// 以块方式写,添加 chunkedWriter 处理器
.addLast(new ChunkedWriteHandler())
/**
* 说明: 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来; 2. 这就是为什么当浏览器发送大量数据时,就会发出多次
* http请求的原因
*/
.addLast(new HttpObjectAggregator(8192))
/**
* 说明: 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的; 2. 可以看到 WebSocketFrame 下面有6个子类 3. 浏览器发送请求时:
* ws://localhost:7000/hello 表示请求的uri 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws
* 协议,保持长连接; 是通过一个状态码 101 来切换的
*/
.addLast(new WebSocketServerProtocolHandler("/"));
}
}
- 创建一个 WebSocketServerHandler 类,用于处理 WebSocket 请求:
package xyz.tiegangan.chat.common.websocket;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
/**
* @author huangmuhong
* @version 1.0.0
* @date 2023/7/2
*/
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {
// 当web客户端连接后,触发该方法
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}
// 客户端离线
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}
/**
* 取消绑定
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {}
/**
* 心跳检查
*
* @param ctx
* @param evt
* @throws Exception
*/
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}
// 处理异常
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}
// 处理报文
@Override
protected void channelRead0(
ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)
throws Exception {}
}
在启动类(例如 Application.java)中添加 @EnableWebSocket 注解,启用 Spring Boot 的 WebSocket 功能。
运行项目,并使用浏览器或其他工具连接到 ws://localhost:8080/websocket,即可与 Netty WebSocket Server 进行通信。
评论区