avatar

java网络编程_Netty入门

Netty异步模型

基本介绍

异步的概念和同步相对。当一个异步过程调用发出后,调用者不用立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者

Netty中的I/O操作都是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。调用者并不能立刻获取结果,而是通过Future-Listener机制。用户可以方便的主动获取或者通过通知机制获取IO操作结果,Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法run,计算过程非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法run的处理过程(即:Future-Listener机制)

Future和Future-Listener

1、Future
表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,ChannelFuture是子接口,ChannelFuture可以添加监听器,当监听的事件发生时,就会通知到监听器
当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。
常用的方法:

  • sync方法:阻塞等待程序执行结果返回
  • isDone:判断当前操作是否完成
  • isSuccess:判断已经完成的当前操作是否成功
  • getCause:获取已经完成的当前操作失败原因
  • isCancelled:判断已完成的当前操作是否被取消
  • addListener:注册监听器,当操作已经完成(isDone方法返回完成),将会通知指定的监听器

2、Future-Lisener机制
给Future添加监听器,监听操作结果
代码实现:

ChannelFuture future = serverBootstrap.bind(9999).sync();
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
if(channelFuture.isSuccess()){
System.out.println("端口绑定成功");
}else {
System.out.println("端口绑定失败");
}
}
});

Netty编解码器

java编解码器

1、编码(Encode)称为序列化,它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途
2、解码(Decode)称为反序列化,它把网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作
java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化
java序列化的目的:1、网络传输,2、对象持久化
java序列化缺点:1、无法跨语言。2、序列化后码流太大。3、序列化性能太低
java序列化仅仅是java编解码技术的一种,由于它的种种缺陷,衍生出多种编解码技术和框架,这些编解码技术框架实现消息的高效序列化

Netty编解码器

1、概念
在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行相互转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码
对Netty来言,编辑骂起由两部分组成:编码器、解码器

  • 解码器:负责将消息从字节或者其他序列化形式转成指定的消息对象
  • 编码器:将消息对象转成字节或者其他序列形式在网络上传输
    Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖于ChannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑
  • Netty里面的编解码:编码器:负责“出站OutboundHandler”数据。解码器“入站InboundHandler”数据*

解码器(Decode)

解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据最抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中,对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder

抽象解码器:

  • ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
  • ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是RepalyingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持,项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
  • MessageToMessageDecoder:用于从一种消息解码为另一种消息(例如POJO到POJO)

核心方法:

  • decode(ChannelHandlerContext ctx,ByteBuf msg,List out)

    代码实现:

    public class MessageDecoder extends MessageToMessageDecoder {
    @Override
    protected void decode(ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception {
    System.out.println("正在进行消息解码。。。");
    ByteBuf byteBuf = (ByteBuf) o;
    list.add(byteBuf.toString(CharsetUtil.UTF_8));
    }
    }
    // 编写完以后需要将该解码器注册到pipeline中
    .childHandler(new ChannelInitializer<SocketChannel>() { // workGroup创建体格通道初始化对象
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
    // 添加解码器,解码器一定要在业务handler前面
    channel.pipeline().addLast("messageDecoder",new MessageDecoder());
    // 向pipeline中添加自定义的业务处理handler
    channel.pipeline().addLast(new NettyServerHandler());
    }
    });
    // 此时管道读取的时候 就不需要进行转换了,这个时候的object就是转换后的字符串了

    编码器(Encoder)

    与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都是实现ChannelOutboundHandler接口

    抽象编码器:

    • MessageToByteEncoder:将消息转换为字节
    • MessageToMessageEncoder:用于从一种消息编码为另一种消息(例如POJO到POJO)

    代码实现:

    public class MessageEncoder extends MessageToMessageEncoder {
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    System.out.println("消息正在进行编码....");
    String str = (String)msg;
    out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    }
    }
    // 添加到管道中
    .childHandler(new ChannelInitializer<SocketChannel>() { // workGroup创建体格通道初始化对象
    @Override
    protected void initChannel(SocketChannel channel) throws Exception {
    // 添加解码器,解码器一定要在业务handler前面
    channel.pipeline().addLast("messageDecoder",new MessageDecoder());
    // 添加编码器
    channel.pipeline().addLast("messageEncoder",new MessageEncoder());
    // 向pipeline中添加自定义的业务处理handler
    channel.pipeline().addLast(new NettyServerHandler());
    }
    });
    // 发送出去的信息就不需要进行ByteBuf的转换了,我们就直接可以传输String即可

    编解码器(Codec)

    编解码器:同时具有编码与解码的功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理

    Netty提供了一个CHannelDuplexHandler适配器类,编码解码类的抽象基类ByteToMessageCodec和MessageToMessageCodec都继承此类。
    代码实现:

    public class MessageCodec extends MessageToMessageCodec {
    /**
    * 编码器
    * @param ctx
    * @param msg
    * @param out
    * @throws Exception
    */
    @Override
    protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    System.out.println("消息正在进行编码....");
    String str = (String)msg;
    out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8));
    }

    /**
    * 解码器
    * @param ctx
    * @param msg
    * @param out
    * @throws Exception
    */
    @Override
    protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception {
    System.out.println("正在进行消息解码。。。");
    ByteBuf byteBuf = (ByteBuf) msg;
    out.add(byteBuf.toString(CharsetUtil.UTF_8));
    }
    }
    // 还必须要注册到管道中才能使用。
    channel.pipeline().addLast(new MessageCodec());

    Netty案例-群聊天室

    案例需求:

    • 编写一个Netty群聊系统,实现服务器和客户端之间的数据简单通讯
    • 实现多人聊天
    • 服务器端:可以检测用户上线,离线,并实现消息的转发功能
    • 客户端:可以发送消息给其他所有用户,同时可以接受其他用户发送的消息。

    聊天室服务端编写

    netty中每次有新的请求传入的时候,都会创建新的Handler来处理请求,如果想要共用一个需要在自定义Handler中加入注解@ChannelHandler.Sharable

    public class NettyChatServer {

    private int port ;

    public NettyChatServer(int port) {
    this.port = port;
    }

    public void run() throws InterruptedException {
    EventLoopGroup bossGroup = null;
    EventLoopGroup workerGroup = null;
    try{
    bossGroup = new NioEventLoopGroup(1);
    workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap()
    .group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,128)
    .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    // 添加编码器
    ch.pipeline().addLast(new StringDecoder());
    ch.pipeline().addLast(new StringEncoder());
    // netty中每次有新的请求传入的时候,都会创建新的Handler来处理请求,如果想要共用一个需要在自定义Handler中加入注解@ChannelHandler.Sharable
    ch.pipeline().addLast(new NettyChatServerHandler());
    }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(port);
    channelFuture.addListener((ChannelFutureListener) future -> {
    if (future.isSuccess()){
    System.out.println("聊天室服务端启动成功");
    }else{
    System.out.println("聊天室服务端启动失败");
    }
    });
    channelFuture.channel().closeFuture().sync();
    }finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) throws InterruptedException {
    new NettyChatServer(9999).run();
    }
    }

    编写业务处理Handler

    public class NettyChatServerHandler extends SimpleChannelInboundHandler<String> {

    public static List<Channel> channels = new ArrayList<>();

    /**
    * 通道就绪事件
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    // 当有新的客户端连接的时候,将通道放入集合
    channels.add(channel);
    System.out.println("[Server]:"+channel.remoteAddress().toString().substring(1)+"在线。");
    }

    /**
    * 通道下线事件
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    // 当有客户端断开连接的时候就移除
    channels.remove(channel);
    System.out.println("[Server]:"+channel.remoteAddress().toString().substring(1)+"下线。");
    }

    /**
    * 异常处理
    * @param ctx
    * @param cause
    * @throws Exception
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    Channel channel = ctx.channel();
    channels.remove(channel);
    cause.printStackTrace();
    System.out.println("[Server]:"+channel.remoteAddress().toString().substring(1)+"异常。");
    }

    /**
    * 通道读取事件
    * @param ctx
    * @param msg
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
    // 当前发送消息的客户端连接
    Channel channel = ctx.channel();
    for (Channel channel1 : channels) {
    // 排除自身
    if(channel != channel1){
    channel1.writeAndFlush("["+channel.remoteAddress().toString().substring(1)+"]说:"+msg);
    }
    }
    }
    }

    现在聊天室的服务端编写完毕。

    聊天室客户端编写

    编写handler,职责就是打印出来信息即可

    public class NettyChatClientHandler extends SimpleChannelInboundHandler<String> {
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
    System.out.println(s);
    }
    }

    编写主启动类

    public class NettyChatClient {

    private String ip;

    private int port;

    public NettyChatClient(String ip, int port) {
    this.ip = ip;
    this.port = port;
    }

    public void run() throws InterruptedException {
    EventLoopGroup group = null;
    try{
    group = new NioEventLoopGroup();
    Bootstrap bootstrap = new Bootstrap()
    .group(group)
    .channel(NioSocketChannel.class)
    .handler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {
    socketChannel.pipeline().addLast(new StringDecoder());
    socketChannel.pipeline().addLast(new StringEncoder());
    socketChannel.pipeline().addLast(new NettyChatClientHandler());
    }
    });
    // 启动客户端,等待连接服务器,将异步改为同步
    ChannelFuture channelFuture = bootstrap.connect(ip, port).sync();
    Channel channel = channelFuture.channel();
    System.out.println("--------------"+channel.localAddress().toString().substring(1)+"-------------");
    Scanner scanner = new Scanner(System.in);
    while (scanner.hasNextLine()){
    String line = scanner.nextLine();
    // 向服务端发起消息
    channel.writeAndFlush(line);
    }
    // 关闭通道和连接池
    channelFuture.channel().closeFuture().sync();
    }finally {
    group.shutdownGracefully();
    }
    }

    public static void main(String[] args) throws InterruptedException {
    new NettyChatClient("127.0.0.1",9999).run();
    }
    }

    运行多个客户端与一个服务端,就可以实现群聊功能。

    基于Netty的Http服务器开发

    介绍

    Netty的Http协议栈无论在性能上还是可靠性上,都表现优异,非常适合在非web容器的场景下应用,相比于传统的tomcat、jetty等web容器,它更加轻量和小巧,灵活性和定制性也更好。

    功能需求

    • Netty服务器在8080端口监听
    • 浏览器发出请求”http://localhost:8080"
    • 服务器可以回复消息给客户端”Hello!我是Netty服务器”,并对特定请求资源进行过滤

    服务端代码实现

    NettyHttpServer 代码与上述几个的代码类似,重点区别就是编码器的区别,这里用到了HttpServerCodec的编码器

    // NettyHttpServer
    public class NettyHttpServer {
    private int port ;

    public NettyHttpServer(int port) {
    this.port = port;
    }

    public void run() throws InterruptedException {
    EventLoopGroup bossGroup = null;
    EventLoopGroup workerGroup = null;
    try{
    bossGroup = new NioEventLoopGroup(1);
    workerGroup = new NioEventLoopGroup();
    ServerBootstrap serverBootstrap = new ServerBootstrap()
    .group(bossGroup, workerGroup)
    .channel(NioServerSocketChannel.class)
    .option(ChannelOption.SO_BACKLOG,128)
    .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel ch) throws Exception {
    // 添加编码器
    ch.pipeline().addLast(new HttpServerCodec());
    ch.pipeline().addLast(new NettyHttpServerHandler());
    }
    });
    ChannelFuture channelFuture = serverBootstrap.bind(port);
    channelFuture.addListener((ChannelFutureListener) future -> {
    if (future.isSuccess()){
    System.out.println("Http服务端启动成功");
    }else{
    System.out.println("Http服务端启动失败");
    }
    });
    channelFuture.channel().closeFuture().sync();
    }finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) throws InterruptedException {
    new NettyHttpServer(8080).run();
    }
    }

    编写NettyHttpServerHandler

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    /**
    * 读取就绪事件
    * @param channelHandlerContext
    * @param httpObject
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception {
    // 判断请求是不是http请求
    if(httpObject instanceof HttpRequest){
    DefaultHttpRequest request = (DefaultHttpRequest) httpObject;
    // 打印路径
    System.out.println("浏览器请求路径:"+request.uri());
    if ("/favicon.ico".equals(request.uri())){
    System.out.println("图标不响应");
    return;
    }
    // 2、 给浏览器进行响应
    ByteBuf byteBuf = Unpooled.copiedBuffer("Hello!我是Netty服务器", CharsetUtil.UTF_8);
    DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf);
    // 设置响应头
    response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8");
    response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes());
    channelHandlerContext.writeAndFlush(response);
    }
    }
    }

    编写完成后,在页面启动,输入”http://localhost:8080",页面会返回对应的信息。

    基于Netty的WebSocket开发网页版聊天室

    WebSocket介绍

    WebSocket是一种在单个TCP连接上进行的全双工通信的协议。WebSocket使得客户端和服务器之间交换变得更加简单,允许服务器主动向客户端发送消息数据。在WebSocket API中,客户端和服务端只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。
    应用场景:

    • 社交订阅
    • 协同编辑/编程
    • 股票基金报价
    • 体育实况更新
    • 多媒体聊天
    • 在线教育

    WebSocket与Http的区别

    Http协议是在应用层的协议,它是基于tcp协议的,http协议建立连接也必须要有三次握手才能发消息。http连接分为短链接和长连接,短连接时每次请求都要三次握手才能发送自己的消息,即每一个request对应一个response,长连接时在一定期限内保持连接,保持TCP连接不断开,客户端与服务端通信,必须要有客户端先发起,然后服务端返回结果,客户端时主动的,服务端是被动的,客户端要想实时获取服务端消息就得不断发送长连接到服务端。
    Websocket实现了多路复用,它是全双工的。在WebSocket协议下服务端和客户端可以同时发送信息,建立了WebSocket连接之后,服务端可以主动发送信息到客户端,而且信息当中不必带有heand的部分信息了,与http的长连接相比,这种方式,不仅能降低服务器压力,而且信息当中也减少了部分多余的信息

    环境搭建以及关键编码实现

    因为是webSocket,为了方便我们引入SpringBoot用来编写除WebSocket以外的web服务器端功能,包括http请求、以及静态页面的资源请求等
    NettyWebSocketServer,Netty服务的主启动类

    /**
    * Netty服务器
    */
    @Component
    public class NettyWebSocketServer implements Runnable {

    @Autowired
    NettyConfig nettyConfig;

    @Autowired
    WebSocketChannelInit webSocketChannelInit;

    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);

    private EventLoopGroup workerGroup = new NioEventLoopGroup();

    /**
    * 资源关闭--在容器销毁是关闭
    */
    @PreDestroy
    public void close() {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }

    @Override
    public void run() {
    try {
    //1.创建服务端启动助手
    ServerBootstrap serverBootstrap = new ServerBootstrap();
    //2.设置线程组
    serverBootstrap.group(bossGroup, workerGroup);
    //3.设置参数
    serverBootstrap.channel(NioServerSocketChannel.class)
    // 设置一个日志处理器
    .handler(new LoggingHandler(LogLevel.DEBUG))
    .childHandler(webSocketChannelInit);
    //4.启动
    ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync();
    System.out.println("--Netty服务端启动成功---");
    channelFuture.channel().closeFuture().sync();
    } catch (Exception e) {
    e.printStackTrace();
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    } finally {
    bossGroup.shutdownGracefully();
    workerGroup.shutdownGracefully();
    }
    }
    }

    我们还需要编写一个WebSocketChannelInit,用于初始化管道

    /**
    * 通道初始化对象
    */
    @Component
    public class WebSocketChannelInit extends ChannelInitializer {

    @Autowired
    NettyConfig nettyConfig;

    @Autowired
    WebSocketHandler webSocketHandler;

    @Override
    protected void initChannel(Channel channel) throws Exception {
    ChannelPipeline pipeline = channel.pipeline();
    //对http协议的支持.
    pipeline.addLast(new HttpServerCodec());
    // 对大数据流的支持
    pipeline.addLast(new ChunkedWriteHandler());
    //post请求分三部分. request line / request header / message body
    // HttpObjectAggregator将多个信息转化成单一的request或者response对象
    pipeline.addLast(new HttpObjectAggregator(8000));
    // 将http协议升级为ws协议. websocket的支持
    pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath()));
    // 自定义处理handler
    pipeline.addLast(webSocketHandler);

    }
    }

    编写WebSocketHandler 真正的业务处理类

    /**
    * 自定义处理类
    * TextWebSocketFrame: websocket数据是帧的形式处理
    */
    @Component
    @ChannelHandler.Sharable //设置通道共享 因为在Netty中每次存在新的连接都会创建新的Handler 设置了共享,那么每次都会用一个Handler
    public class WebSocketHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    public static List<Channel> channelList = new ArrayList<>();

    /**
    * 通道就绪事件
    *
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    //当有新的客户端连接的时候, 将通道放入集合
    channelList.add(channel);
    System.out.println("有新的连接.");
    }


    /**
    * 通道未就绪--channel下线
    *
    * @param ctx
    * @throws Exception
    */
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    Channel channel = ctx.channel();
    //当有客户端断开连接的时候,就移除对应的通道
    channelList.remove(channel);
    }

    /**
    * 读就绪事件
    *
    * @param ctx
    * @param textWebSocketFrame
    * @throws Exception
    */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception {
    // 客户端发送的消息
    String msg = textWebSocketFrame.text();
    System.out.println("msg:" + msg);
    //当前发送消息的通道, 当前发送的客户端连接
    Channel channel = ctx.channel();
    for (Channel channel1 : channelList) {
    //排除自身通道
    if (channel != channel1) {
    channel1.writeAndFlush(new TextWebSocketFrame(msg));
    }
    }
    }


    /**
    * 异常处理事件
    *
    * @param ctx
    * @param cause
    * @throws Exception
    */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    cause.printStackTrace();
    Channel channel = ctx.channel();
    //移除集合
    channelList.remove(channel);
    }
    }

    编写完成这三个类后,需要加入到Spring中并且启动,那么就需要编写一个类实现CommandLineRunner接口中的run方法,该方法是在Spring启动完成后调用的

    public class NettySpringbootApplication implements CommandLineRunner {
    @Autowired
    NettyWebSocketServer nettyWebSocketServer;

    @Override
    public void run(String... args) throws Exception {
    new Thread(nettyWebSocketServer).start();
    }
    }

    编写客户端的javascript访问程序

    var ws = new WebSocket("ws://localhost:8081/chat");
    ws.onopen = function () {
    console.log("连接成功.")
    }
    ws.onmessage = function (evt) {
    showMessage(evt.data);
    }
    ws.onclose = function (){
    console.log("连接关闭")
    }

    ws.onerror = function (){
    console.log("连接异常")
    }
    // ws.send(message); 发送消息

    这样我们的基于Netty的webSocket就完成了。

    Netty中粘包和拆包的解决方案

    粘包和拆包简介

    粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制
    TCP是个”流”协议,所谓流,就是没有界限的一串数字。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个打的数据包发送,这就是所谓的TCP粘包与拆包问题
    如下图所示,假设客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到 的字节数不确定的,故看了能存在下面四种情况:

    TCP粘包和拆包产生的原因:
    数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就是在这个缓冲区上,粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理

    粘包拆包的解决方案

    1、业内解决方案
    由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:

    • 消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到一个完整的信息
    • 将换行符作为消息结束符
    • 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
    • 通过在消息头定义长度字段来标识消息的总长度

    2、Netty中的粘包和拆包解决方案,Netty提供了4种编码器来解决

    • 固定长度的拆包器FixedLengthFrameDecoder,每个应用层数据包的都拆分都是固定长度的大小
    • 行拆分器LineBasedFrameDecoder,每个应用层数据包,都以换行符作为分割符,进行分割拆分
    • 分隔符拆包器DelimiterBasedFrameDecoder,每个应用层数据包,都通过自定义的分割符,进行分割拆分
    • 基于数据包长度的拆包器LengthFieldBasedFrameDecoder,将应用层数据包的长度,作为接收端应用层数据包的拆分依据,按照应用层数据包的大小拆包。这个拆包器,有一个要求,就是应用层协议种包含数据包的长度

    3、代码实现
    只需要添加相应的解码器即可

    • LineBasedFrameDecoder:以换行符为每一次发送语句的结束,使用时需要在每一次发送完成的结尾增加换行符\n.
      .childHandler((ChannelInitializer)(ch)->{
      // 添加行解码器
      ch.pipeline().addLast(new LineBasedFrameDecoder(2048));
      //...
      })
    • DelimiterBasedFrameDecoder: 设置特殊分隔符用于防止粘包
      .childHandler((ChannelInitializer)(ch)->{
      // 添加行解码器 $ 就是用于处理粘包的分隔符
      ByteBuf bytebuf = Unpooled.copiedBuffer("$".getByte(StandardCharsets.UTF_8))
      ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048,bytebuf));
      //...
      })
    • 对于拆包的解决办法是一样的。同样也是使用,需要在服务端设置最大包的字节数,并且需要用换行符进行分割或者特定字符进行分割。如果客户端超出了服务端设置的最大限制则会报错。
    文章作者: zenshin
    文章链接: https://zlh.giserhub.com/2022/01/27/cl35o0nft00i2p4tgf331hkrr/
    版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 zenshin's blog
    打赏
    • 微信
      微信
    • 支付宝
      支付宝

    评论