XiaoLin's Blog

Xiao Lin

SpringBoot 整合 NettyWebsocketServer

10
2024-02-04

netty 与 websocket

Netty是一个网络应用框架,可以用来处理大量的客户端请求。它也可以用来开发高性能的、可扩展的网络应用服务器,例如WebSocket服务器。

WebSocket 是一种为双向通信开发的技术,允许浏览器和服务端进行全双工通信。 它是 HTML5 的一部分,它使得浏览器可以建立持久性连接,从而允许浏览器和服务端在各自之间进行实时通信。

将 Netty 与 WebSocket 结合起来,我们就可以利用 Netty 的处理能力(例如事件驱动、异步 I/O、高性能等特性)来开发出高性能的 WebSocket 服务端应用。Spring Boot 允许我们使用 Netty 来创建 WebSocket 服务端应用,并且它还内置了一些常用的 Netty 配置,因此我们不需要写大量的代码就能快速地开发出一个 WebSocket 服务端应用。

快速整合

Spring Boot 是一个用于创建独立的、基于生产级别的 Spring 应用程序的框架。它提供了自动配置、快速开发等特性,让开发人员能够更加便捷地构建应用程序。

Netty 是一个高性能的网络通信框架,它提供了异步的、事件驱动的网络编程模型。Netty 可以轻松地实现各种协议和传输方式,包括 HTTP、WebSocket 等。

在 Spring Boot 中整合 Netty WebSocket Server 的过程如下:

  1. 创建一个基于 Spring Boot 的 Maven 项目。
  2. 在 pom.xml 文件中添加 Netty 和 WebSocket 的依赖:
<dependencies>
    <!-- Netty -->
    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.63.Final</version>
    </dependency>

    <!-- WebSocket -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>
</dependencies>
  1. 创建一个 NettyWebSocketServer 类,作为 WebSocket 服务器的启动类:
package xyz.tiegangan.chat.common.websocket;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.util.NettyRuntime;
import java.util.List;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.SmartInitializingSingleton;
import org.springframework.beans.factory.annotation.Value;

/**
 * 基于 netty 的 websocket 服务端,独立于 webmvc 的端口
 *
 * @author huangmuhong
 * @version 1.0.0
 * @date 2023/07/02
 */
@Slf4j
@RequiredArgsConstructor
public class NettyWebSocketServer implements DisposableBean, SmartInitializingSingleton {

  @Value("${server.websocket.port:8090}")
  public int webSocketPort;
  // 创建线程池执行器
  private final EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  private final EventLoopGroup workerGroup =
      new NioEventLoopGroup(NettyRuntime.availableProcessors());

  private final List<ChannelInboundHandler> channelInboundHandlerAdapters;

  @Override
  public void destroy() throws Exception {
    bossGroup.shutdownGracefully().syncUninterruptibly();
    workerGroup.shutdownGracefully().syncUninterruptibly();
    log.info("NettyWebSocketServer destroy success");
  }

  @Override
  public void afterSingletonsInstantiated() {
    try {
      run();
    } catch (Throwable e) {
      log.error("NettyWebSocketServer start error", e);
    }
  }

  private void run() {
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    serverBootstrap
        .group(bossGroup, workerGroup)
        .channel(NioServerSocketChannel.class)
        .option(ChannelOption.SO_BACKLOG, 128)
        .option(ChannelOption.SO_KEEPALIVE, true)
        .handler(new LoggingHandler(LogLevel.INFO)) // 为 bossGroup 添加 日志处理器
        .channel(NioServerSocketChannel.class)
        .childHandler(
            new ChannelInitializer<SocketChannel>() {
              @Override
              protected void initChannel(SocketChannel ch) throws Exception {
                ch.pipeline().addLast(new NettyWebSocketServerInitializer());
              }
            });
  }
}
  1. 创建一个 WebSocketServerInitializer 类,用于初始化 WebSocket 服务器:
package xyz.tiegangan.chat.common.websocket;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;

/**
 * netty websocket 链式处理器初始化类
 *
 * @author huangmuhong
 * @version 1.0.0
 * @date 2023/07/02
 */
public class NettyWebSocketServerInitializer extends ChannelInitializer<SocketChannel> {
  @Override
  protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel
        .pipeline()
        // 30秒客户端没有向服务器发送心跳则关闭连接
        .addLast(new IdleStateHandler(30, 0, 0))
        // 因为使用http协议,所以需要使用http的编码器,解码器
        .addLast(new HttpServerCodec())
        // 以块方式写,添加 chunkedWriter 处理器
        .addLast(new ChunkedWriteHandler())
        /**
         * 说明: 1. http数据在传输过程中是分段的,HttpObjectAggregator可以把多个段聚合起来; 2. 这就是为什么当浏览器发送大量数据时,就会发出多次
         * http请求的原因
         */
        .addLast(new HttpObjectAggregator(8192))
        /**
         * 说明: 1. 对于 WebSocket,它的数据是以帧frame 的形式传递的; 2. 可以看到 WebSocketFrame 下面有6个子类 3. 浏览器发送请求时:
         * ws://localhost:7000/hello 表示请求的uri 4. WebSocketServerProtocolHandler 核心功能是把 http协议升级为 ws
         * 协议,保持长连接; 是通过一个状态码 101 来切换的
         */
        .addLast(new WebSocketServerProtocolHandler("/"));
  }
}

  1. 创建一个 WebSocketServerHandler 类,用于处理 WebSocket 请求:
package xyz.tiegangan.chat.common.websocket;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
 * @author huangmuhong
 * @version 1.0.0
 * @date 2023/7/2
 */
public class NettyWebSocketServerHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

  // 当web客户端连接后,触发该方法
  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {}

  // 客户端离线
  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {}

  /**
   * 取消绑定
   *
   * @param ctx
   * @throws Exception
   */
  @Override
  public void channelInactive(ChannelHandlerContext ctx) throws Exception {}

  /**
   * 心跳检查
   *
   * @param ctx
   * @param evt
   * @throws Exception
   */
  @Override
  public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {}

  // 处理异常
  @Override
  public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {}

  // 处理报文
  @Override
  protected void channelRead0(
      ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame)
      throws Exception {}
}
  1. 在启动类(例如 Application.java)中添加 @EnableWebSocket 注解,启用 Spring Boot 的 WebSocket 功能。

  2. 运行项目,并使用浏览器或其他工具连接到 ws://localhost:8080/websocket,即可与 Netty WebSocket Server 进行通信。