netty-学习

作者 : admin 本文共22931个字,预计阅读时间需要58分钟 发布时间: 2024-06-10 共2人阅读

Netty

    • Netty 的核心概念
    • Netty 的主要特性
    • Netty 的应用场景
    • Netty 的基本使用
        • 服务器端
        • 处理器
      • 总结
  • 代码分析
    • 1.心跳检测
      • 代码解析
        • 类和成员变量
        • `userEventTriggered`方法
        • 总结
    • 4.参数详解
        • `ChannelHandlerContext ctx`
        • `Object evt`
      • 事件来源
      • 示例:配置 `IdleStateHandler`
      • 事件处理
      • 示例回调
    • 2.Netty WebSocket 服务器启动器类
      • 1.代码解析
        • 类和成员变量
        • 资源关闭方法
        • `run` 方法
      • 2.总结
      • 3.Netty WebSocket 处理器 HandlerWebSocket
      • 代码解析
        • 类和成员变量
        • `channelActive` 方法
        • `channelInactive` 方法
        • `channelRead0` 方法
        • `userEventTriggered` 方法
        • `getToken` 方法
      • 总结
    • 完整过程
      • 1. 客户端连接到 WebSocket 服务器
      • 2. 服务器端初始化和配置
        • Netty 服务器启动
      • 3. 通道初始化
      • 4. 握手和连接事件
        • WebSocket 协议处理和心跳检测
        • 心跳检测
      • 5. 消息处理
      • 6. 连接关闭
      • 总结

Netty 是一个基于 Java 的异步事件驱动的网络应用框架,用于快速开发高性能、高可靠性的网络应用。Netty 提供了丰富的 API,支持多种传输协议和多种编解码方式,广泛应用于高性能的网络服务器和客户端的开发。

Netty 的核心概念

  1. Channel:Netty 中的基本网络操作抽象。它代表一个打开的连接,比如一个到远程服务器的 TCP 连接。Channel 提供了异步的网络 I/O 操作,如读、写、连接和绑定。

  2. EventLoop:Netty 中处理 I/O 操作的核心。EventLoop 是一个处理 I/O 事件、运行任务和处理定时任务的循环。每个 Channel 都会分配给一个 EventLoop。

  3. ChannelFuture:Netty 中的异步操作结果。ChannelFuture 提供了在操作完成时通知的机制,允许你在操作完成后执行一些特定的操作(如写操作完成后的回调处理)。

  4. ChannelHandler:用于处理 Channel 的 I/O 事件和数据。ChannelHandler 是处理网络事件和数据的核心接口。你可以实现 ChannelHandler 来处理入站和出站数据。

  5. ChannelPipeline:Netty 中的责任链模式实现。每个 Channel 都有一个 ChannelPipeline,它持有一个 ChannelHandler 的链。当一个 I/O 事件发生时,事件在 ChannelPipeline 中传播,通过其中的 ChannelHandler 进行处理。

  6. ByteBuf:Netty 中的数据容器,比 Java 的 NIO ByteBuffer 更高效和灵活。ByteBuf 提供了丰富的 API,支持动态扩展、零拷贝等特性。

Netty 的主要特性

  1. 异步和事件驱动:Netty 基于异步和事件驱动的编程模型,能够处理大量并发连接,并提供高吞吐量和低延迟。

  2. 多协议支持:Netty 支持多种协议,如 HTTP、WebSocket、TCP、UDP 等。你可以轻松实现自定义协议。

  3. 高效的内存管理:Netty 提供了高效的内存管理机制,避免了频繁的垃圾回收,提升了应用的性能。

  4. 丰富的工具类和扩展点:Netty 提供了大量的工具类和扩展点,方便开发者定制和扩展。

Netty 的应用场景

  1. 高性能的网络服务器:如 HTTP 服务器、游戏服务器、聊天服务器等。
  2. 分布式系统中的网络通信:如 RPC 框架、消息中间件等。
  3. 代理服务器:如反向代理、网关等。
  4. 任何需要高并发和低延迟的网络应用

Netty 的基本使用

以下是一个简单的 Netty 服务器示例:

服务器端
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
public class NettyServer {
private final int port;
public NettyServer(int port) {
this.port = port;
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 处理连接事件
EventLoopGroup workerGroup = new NioEventLoopGroup(); // 处理I/O事件
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync(); // 绑定端口
f.channel().closeFuture().sync(); // 等待服务器关闭
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
new NettyServer(port).start();
}
}
处理器
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // 将接收到的消息写回去
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush(); // 刷新所有写入到客户端的消息
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close(); // 发生异常时关闭连接
}
}

总结

Netty 是一个强大的网络编程框架,适用于构建高性能、高可用性的网络应用。它提供了丰富的 API 和灵活的扩展机制,使得开发高性能的网络应用变得更加简单和高效。通过异步事件驱动模型和高效的内存管理,Netty 可以处理大量并发连接并提供低延迟的服务。


代码分析

1.心跳检测

public class HandlerHeartBeat extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
String userId = attribute.get();
logger.info("用户{}没有发送心跳断开连接", userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
}

代码解析

这是一个Netty的处理器类HandlerHeartBeat,继承了ChannelDuplexHandler。这个类主要用于处理心跳检测逻辑,以确保连接的存活性。以下是对代码的详细解析:

类和成员变量
public class HandlerHeartBeat extends ChannelDuplexHandler {
private static final Logger logger = LoggerFactory.getLogger(HandlerHeartBeat.class);
}
  • HandlerHeartBeat:继承自ChannelDuplexHandler,它是Netty提供的一个用于处理双向事件的处理器类。
userEventTriggered方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
String userId = attribute.get();
logger.info("用户{}没有发送心跳断开连接", userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
  • userEventTriggered:这是Netty中的一个回调方法,当用户事件触发时会调用这个方法。在心跳检测中,当连接变为空闲时,Netty会触发一个IdleStateEvent事件。

  • if (evt instanceof IdleStateEvent):检查事件是否为IdleStateEventIdleStateEvent是Netty提供的一个特殊事件,用于表示连接的空闲状态。

  • IdleStateEvent e = (IdleStateEvent) evt:将事件强制转换为IdleStateEvent

  • 处理READER_IDLE状态

    if (e.state() == IdleState.READER_IDLE) {
    Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
    String userId = attribute.get();
    logger.info("用户{}没有发送心跳断开连接", userId);
    ctx.close();
    }
    
    • IdleState.READER_IDLE:表示读取通道空闲,即长时间没有从客户端读取到数据。
    • ctx.channel().attr(...):获取与通道相关的属性。在这里,通过通道的ID作为键来获取用户ID。
    • logger.info(...):记录日志,说明哪个用户没有发送心跳包导致连接断开。
    • ctx.close():关闭连接。
  • 处理WRITER_IDLE状态

    else if (e.state() == IdleState.WRITER_IDLE) {
    ctx.writeAndFlush("heart");
    }
    
    • IdleState.WRITER_IDLE:表示写入通道空闲,即长时间没有向客户端发送数据。
    • ctx.writeAndFlush("heart"):向客户端发送一个心跳消息 "heart",保持连接的活跃状态。
总结

这个处理器类HandlerHeartBeat主要用于处理心跳检测逻辑,以确保客户端和服务器之间的连接在长时间没有数据交互时保持活跃或及时关闭:

  • 如果长时间没有读取到数据(READER_IDLE),则关闭连接并记录日志。
  • 如果长时间没有向客户端发送数据(WRITER_IDLE),则发送一个心跳消息 "heart" 以保持连接活跃。
    在Netty中,userEventTriggered 方法的参数是 ChannelHandlerContextObject 类型的事件。以下是详细解释:

4.参数详解

ChannelHandlerContext ctx
  • 类型ChannelHandlerContext
  • 作用ChannelHandlerContext 提供了各种操作以触发 IO 操作和事件处理(如读取、写入、连接、断开等)。它关联了一个 Channel,并且允许访问 ChannelPipeline 中的其他 ChannelHandler
Object evt
  • 类型Object
  • 作用:这是传递给该方法的事件对象。在心跳检测的场景下,这个事件对象通常是 IdleStateEvent,它表示连接的空闲状态(读空闲、写空闲、读写空闲)。

事件来源

在Netty中,空闲状态检测通常是通过 IdleStateHandler 来实现的。IdleStateHandler 会监测通道的读写操作,如果通道在指定的时间内没有读或写操作,就会触发 IdleStateEvent 事件。

示例:配置 IdleStateHandler

以下是一个示例,展示如何将 IdleStateHandler 添加到 ChannelPipeline 中,以便在通道空闲时触发 IdleStateEvent

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.timeout.IdleStateHandler;
public class NettyServer {
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// 添加 IdleStateHandler,配置读、写空闲时间
p.addLast(new IdleStateHandler(60, 30, 0)); // 读空闲60秒,写空闲30秒
// 添加自定义的心跳检测处理器
p.addLast(new HandlerHeartBeat());
}
});
b.bind(8080).sync().channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

事件处理

IdleStateHandler 发现通道空闲时,会触发 IdleStateEvent,并调用 HandlerHeartBeat 中的 userEventTriggered 方法。此时,ctxevt 参数的具体信息如下:

  • ctx:当前通道的上下文,提供了通道的各种操作方法。
  • evt:具体的事件对象,这里是 IdleStateEvent,表示通道的空闲状态。

示例回调

假设配置的读空闲时间是60秒,写空闲时间是30秒:

  • 如果60秒内没有读取到任何数据,IdleStateHandler 会触发 IdleStateEvent,其中状态为 IdleState.READER_IDLE
  • 如果30秒内没有向通道写入数据,IdleStateHandler 会触发 IdleStateEvent,其中状态为 IdleState.WRITER_IDLE

HandlerHeartBeat 中:

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
String userId = attribute.get();
logger.info("用户{}没有发送心跳断开连接", userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
  • IdleStateEvent:包含了通道的空闲状态(读空闲、写空闲、读写空闲)。
  • IdleState.READER_IDLE:表示读空闲事件。
  • IdleState.WRITER_IDLE:表示写空闲事件。

2.Netty WebSocket 服务器启动器类

@Component
public class NettyWebSocketStarter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);
@Resource
private AppConfig appConfig;
@Resource
private HandlerWebSocket handlerWebSocket;
/**
* boss线程组,用于处理连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* work线程组,用于处理消息
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
/**
* 资源关闭——在容器销毁时关闭
*/
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void run() {
try {
//创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
//设置几个重要的处理器
// 对http协议的支持,使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest
//保证接收的http请求的完整性
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit
// readerIdleTime  读超时事件 即测试段一定事件内未接收到被测试段消息
// writerIdleTime  为写超时时间 即测试端一定时间内想被测试端发送消息
//allIdleTime  所有类型的超时时间
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HandlerHeartBeat());
//将http协议升级为ws协议,对websocket支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
pipeline.addLast(handlerWebSocket);
}
});
//启动
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

1.代码解析

这段代码定义了一个 Netty WebSocket 服务器启动器类 NettyWebSocketStarter,该类实现了 Runnable 接口,可以在独立的线程中运行。这个类负责配置并启动 Netty WebSocket 服务器。

类和成员变量
@Component
public class NettyWebSocketStarter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);
@Resource
private AppConfig appConfig;
@Resource
private HandlerWebSocket handlerWebSocket;
/**
* boss线程组,用于处理连接
*/
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
/**
* work线程组,用于处理消息
*/
private EventLoopGroup workerGroup = new NioEventLoopGroup();
}
  • NettyWebSocketStarter:实现了 Runnable 接口,使得该类可以在独立的线程中运行。
  • logger:用于记录日志的 Logger 对象。
  • appConfig:注入的应用配置类,用于获取配置项,如 WebSocket 端口。
  • handlerWebSocket:注入的 WebSocket 处理器,用于处理 WebSocket 消息。
  • bossGroupworkerGroup:分别用于处理连接和处理消息的线程组。
资源关闭方法
/**
* 资源关闭——在容器销毁时关闭
*/
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
  • @PreDestroy:在 Spring 容器销毁之前调用 close 方法,优雅地关闭 bossGroupworkerGroup,释放资源。
run 方法
@Override
public void run() {
try {
//创建服务端启动助手
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
//设置几个重要的处理器
// 对http协议的支持,使用http的编码器,解码器
pipeline.addLast(new HttpServerCodec());
//聚合解码 httpRequest/htppContent/lastHttpContent到fullHttpRequest
//保证接收的http请求的完整性
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
//心跳 long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit
// readerIdleTime  读超时事件 即测试段一定事件内未接收到被测试段消息
// writerIdleTime  为写超时时间 即测试端一定时间内想被测试端发送消息
//allIdleTime  所有类型的超时时间
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HandlerHeartBeat());
//将http协议升级为ws协议,对websocket支持
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
pipeline.addLast(handlerWebSocket);
}
});
//启动
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
  1. 创建并配置 ServerBootstrap

    • ServerBootstrap 是 Netty 用于引导服务器的启动助手类。
    • serverBootstrap.group(bossGroup, workerGroup) 设置了用于处理连接的 bossGroup 和用于处理消息的 workerGroup
    • serverBootstrap.channel(NioServerSocketChannel.class) 设置了服务器通道类型为 NioServerSocketChannel,它适用于 NIO 传输。
  2. 添加处理器

    • serverBootstrap.handler(new LoggingHandler(LogLevel.DEBUG)):添加一个日志处理器,用于记录调试级别的日志。
    • serverBootstrap.childHandler(new ChannelInitializer() {...}):添加一个子处理器,用于初始化每个新连接的通道。
  3. 初始化通道

    • HttpServerCodec:添加 HTTP 编解码器,支持 HTTP 协议。
    • HttpObjectAggregator:添加 HTTP 对象聚合器,确保接收完整的 HTTP 请求。
    • IdleStateHandler:添加空闲状态处理器,用于检测读超时(60秒)。
    • HandlerHeartBeat:添加心跳检测处理器,用于处理空闲事件。
    • WebSocketServerProtocolHandler:添加 WebSocket 协议处理器,将 HTTP 协议升级为 WebSocket 协议。
    • handlerWebSocket:添加自定义的 WebSocket 消息处理器。
  4. 启动服务器

    • ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();:绑定端口并启动服务器。
    • logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());:记录服务器启动成功的日志。
    • channelFuture.channel().closeFuture().sync();:等待服务器通道关闭。
  5. 异常处理和资源释放

    • 如果发生异常,记录堆栈跟踪并优雅地关闭 bossGroupworkerGroup,释放资源。

2.总结

NettyWebSocketStarter 是一个用于启动 Netty WebSocket 服务器的类。它通过配置一系列处理器(如 HTTP 编解码器、心跳检测处理器、WebSocket 协议处理器等)来初始化服务器,并在指定端口上启动服务器。同时,通过 @PreDestroy 注解确保在 Spring 容器销毁时优雅地关闭资源。该类实现了 Runnable 接口,使其可以在独立线程中运行,通常可以用于多线程环境中启动 Netty 服务器。

3.Netty WebSocket 处理器 HandlerWebSocket

/**
* 设置通道共享
*/
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);
//    @Resource
//    private ChannelContextUtils channelContextUtils;
@Resource
private RedisComponet redisComponet;
/**
* 当通道就绪后会调用此方法,通常我们会在这里做一些初始化操作
*
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// Channel channel = ctx.channel();
logger.info("有新的连接加入。。。");
}
/**
* 当通道不再活跃时(连接关闭)会调用此方法,我们可以在这里做一些清理工作
*
* @param ctx
* @throws Exception
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("有连接已经断开。。。");
//channelContextUtils.removeContext(ctx.channel());
}
/**
* 读就绪事件 当有消息可读时会调用此方法,我们可以在这里读取消息并处理。
*
* @param ctx
* @param textWebSocketFrame
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
//接收心跳
Channel channel = ctx.channel();
logger.info("接收到消息,{}", textWebSocketFrame.text());
// Attribute attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
//String userId = attribute.get();
//redisComponet.saveUserHeartBeat(userId);
}
//用于处理用户自定义的事件  当有用户事件触发时会调用此方法,例如连接超时,异常等。
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String url = complete.requestUri();
String token = getToken(url);
if (token == null) {
ctx.channel().close();
return;
}
TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);
if (null == tokenUserInfoDto) {
ctx.channel().close();
return;
}
/**
* 用户加入
*/
//channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
}
}
/**
* 获取url中的token
*
* @param url
* @return
*/
private String getToken(String url) {
if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {
return null;
}
String[] queryParams = url.split("\?");
if (queryParams.length < 2) {
return url;
}
String[] params = queryParams[1].split("=");
if (params.length != 2) {
return url;
}
return params[1];
}
}

代码解析

这段代码定义了一个 Netty WebSocket 处理器 HandlerWebSocket,继承自 SimpleChannelInboundHandler。该处理器主要用于处理 WebSocket 的各种事件,如连接建立、消息接收、连接关闭等。以下是对代码的详细解析:

类和成员变量
@ChannelHandler.Sharable
@Component("handlerWebSocket")
public class HandlerWebSocket extends SimpleChannelInboundHandler<TextWebSocketFrame> {
private static final Logger logger = LoggerFactory.getLogger(HandlerWebSocket.class);
@Resource
private RedisComponet redisComponet;
}
  • @ChannelHandler.Sharable:注解表明这个处理器是可以在多个 Channel 之间共享的。
  • @Component("handlerWebSocket"):将这个类注册为 Spring 的一个组件,并指定组件名称为 handlerWebSocket
  • logger:用于记录日志的 Logger 对象。
  • redisComponet:注入的 Redis 组件,用于与 Redis 进行交互。
channelActive 方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.info("有新的连接加入。。。");
}
  • channelActive:当通道就绪时调用这个方法,通常用于初始化操作。
  • logger.info:记录有新的连接加入的日志。
channelInactive 方法
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("有连接已经断开。。。");
//channelContextUtils.removeContext(ctx.channel());
}
  • channelInactive:当通道不再活跃(连接关闭)时调用这个方法,通常用于清理工作。
  • logger.info:记录有连接断开的日志。
channelRead0 方法
@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel channel = ctx.channel();
logger.info("接收到消息,{}", textWebSocketFrame.text());
// Attribute attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
// String userId = attribute.get();
// redisComponet.saveUserHeartBeat(userId);
}
  • channelRead0:当有消息可读时调用这个方法,读取并处理消息。
  • Channel:获取当前的通道。
  • logger.info:记录接收到的消息。
  • 注释掉的部分代码:可以用于获取用户 ID 并保存心跳到 Redis。
userEventTriggered 方法
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String url = complete.requestUri();
String token = getToken(url);
if (token == null) {
ctx.channel().close();
return;
}
TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);
if (null == tokenUserInfoDto) {
ctx.channel().close();
return;
}
// 用户加入
// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
}
}
  • userEventTriggered:处理用户自定义事件,如连接超时、异常等。
  • WebSocketServerProtocolHandler.HandshakeComplete:处理 WebSocket 握手完成事件。
  • getToken:从 URL 中获取 token。
  • ctx.channel().close():关闭通道,如果 token 为空或无效。
getToken 方法
private String getToken(String url) {
if (StringTools.isEmpty(url) || url.indexOf("?") == -1) {
return null;
}
String[] queryParams = url.split("\?");
if (queryParams.length < 2) {
return url;
}
String[] params = queryParams[1].split("=");
if (params.length != 2) {
return url;
}
return params[1];
}
  • getToken:从 URL 中提取 token 参数。
  • StringTools.isEmpty(url):检查 URL 是否为空。
  • url.indexOf("?"):检查 URL 是否包含查询参数。
  • url.split("\?"):分割 URL 获取查询参数部分。
  • params.length:检查参数长度是否符合预期。

总结

HandlerWebSocket 是一个 Netty WebSocket 处理器,用于处理各种 WebSocket 事件。它包含以下功能:

  1. channelActive:当通道就绪时,记录新的连接加入。
  2. channelInactive:当通道不再活跃时,记录连接断开。
  3. channelRead0:读取并处理接收到的 WebSocket 消息,记录消息内容。
  4. userEventTriggered:处理 WebSocket 握手完成事件,从 URL 中提取 token 并验证,如果无效则关闭连接。
  5. getToken:从 URL 中提取 token 参数,用于验证用户身份。

通过这些方法,HandlerWebSocket 能够有效地管理 WebSocket 连接的生命周期和消息处理,确保连接的可靠性和安全性。



完整过程

当然可以,我会详细解释从访问 WebSocket 接口到完成所有步骤的整个流程,包括各个代码片段是如何协同工作的。

1. 客户端连接到 WebSocket 服务器

当客户端尝试连接到 WebSocket 服务器时,例如通过 ws://localhost:5051/ws?token=16515151,Netty 会接受这个连接请求并触发一系列的事件来处理这个连接。

2. 服务器端初始化和配置

在服务器端,我们使用了 NettyWebSocketStarter 类来初始化和配置 Netty 服务器。

Netty 服务器启动
@Component
public class NettyWebSocketStarter implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(NettyWebSocketStarter.class);
@Resource
private AppConfig appConfig;
@Resource
private HandlerWebSocket handlerWebSocket;
private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
private EventLoopGroup workerGroup = new NioEventLoopGroup();
@PreDestroy
public void close() {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
@Override
public void run() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.DEBUG))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) {
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(64 * 1024));
pipeline.addLast(new IdleStateHandler(60, 0, 0, TimeUnit.SECONDS));
pipeline.addLast(new HandlerHeartBeat());
pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 64 * 1024, true, true, 10000L));
pipeline.addLast(handlerWebSocket);
}
});
ChannelFuture channelFuture = serverBootstrap.bind(appConfig.getWsPort()).sync();
logger.info("Netty服务端启动成功,端口:{}", appConfig.getWsPort());
channelFuture.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

3. 通道初始化

当客户端连接到服务器时,Netty 会初始化通道并调用配置的处理器。这里的处理器包括 HttpServerCodecHttpObjectAggregatorIdleStateHandlerHandlerHeartBeatWebSocketServerProtocolHandler 以及我们的 HandlerWebSocket

4. 握手和连接事件

WebSocket 协议处理和心跳检测

WebSocketServerProtocolHandler 处理 WebSocket 协议的握手。当握手完成时,会触发 userEventTriggered 方法。

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
if (evt instanceof WebSocketServerProtocolHandler.HandshakeComplete) {
WebSocketServerProtocolHandler.HandshakeComplete complete = (WebSocketServerProtocolHandler.HandshakeComplete) evt;
String url = complete.requestUri();
String token = getToken(url);
if (token == null) {
ctx.channel().close();
return;
}
TokenUserInfoDto tokenUserInfoDto = redisComponet.getTokenUserInfoDto(token);
if (null == tokenUserInfoDto) {
ctx.channel().close();
return;
}
// 用户加入
// channelContextUtils.addContext(tokenUserInfoDto.getUserId(), ctx.channel());
}
}
  • 握手完成:当 WebSocket 握手完成时,WebSocketServerProtocolHandler 触发 HandshakeComplete 事件。
  • 提取 Token:从 URL 中提取 token
  • 验证 Token:从 Redis 中验证 Token 是否有效。
  • 管理连接:将用户信息和通道关联(注释部分)。
心跳检测

IdleStateHandler 会检测连接的空闲状态(例如60秒内没有读取到数据),触发相应的事件。

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
IdleStateEvent e = (IdleStateEvent) evt;
if (e.state() == IdleState.READER_IDLE) {
Attribute<String> attribute = ctx.channel().attr(AttributeKey.valueOf(ctx.channel().id().toString()));
String userId = attribute.get();
logger.info("用户{}没有发送心跳断开连接", userId);
ctx.close();
} else if (e.state() == IdleState.WRITER_IDLE) {
ctx.writeAndFlush("heart");
}
}
}
  • 读空闲:如果长时间未读取到数据(READER_IDLE),则关闭连接。
  • 写空闲:如果长时间未写入数据(WRITER_IDLE),发送心跳消息。

5. 消息处理

当客户端发送消息时,Netty 会触发 channelRead0 方法,处理接收到的 WebSocket 消息。

@Override
protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
Channel channel = ctx.channel();
logger.info("接收到消息,{}", textWebSocketFrame.text());
// Attribute attribute = channel.attr(AttributeKey.valueOf(channel.id().toString()));
// String userId = attribute.get();
// redisComponet.saveUserHeartBeat(userId);
}
  • 接收消息:记录接收到的消息内容。
  • 心跳更新(注释部分):可以从消息中提取用户ID,并在 Redis 中更新心跳记录。

6. 连接关闭

当连接关闭时,Netty 会调用 channelInactive 方法。

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
logger.info("有连接已经断开。。。");
//channelContextUtils.removeContext(ctx.channel());
}
  • 记录日志:记录连接断开的日志。
  • 清理资源(注释部分):从上下文中移除断开的连接。

总结

整个过程包括以下步骤:

  1. 客户端连接:客户端通过 WebSocket URL 连接到服务器(如 ws://localhost:5051/ws?token=16515151)。
  2. 服务器初始化NettyWebSocketStarter 启动服务器,并配置各个处理器。
  3. 握手和协议升级WebSocketServerProtocolHandler 处理握手并升级协议。
  4. 事件触发和处理
    • 握手完成:在 userEventTriggered 中处理握手完成事件,提取并验证 token。
    • 心跳检测:在 HandlerHeartBeat 中处理读空闲和写空闲事件。
  5. 消息处理:在 HandlerWebSocketchannelRead0 方法中处理接收到的 WebSocket 消息。
  6. 连接关闭:在 HandlerWebSocketchannelInactive 方法中处理连接关闭事件。

通过这些步骤,Netty 能够高效地管理 WebSocket 连接,确保连接的可靠性和数据的实时性。

本站无任何商业行为
个人在线分享 » netty-学习
E-->