Netty异步模型 基本介绍 异步的概念和同步相对。当一个异步过程调用发出后,调用者不用立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者
Netty中的I/O操作都是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。调用者并不能立刻获取结果,而是通过Future-Listener机制。用户可以方便的主动获取或者通过通知机制获取IO操作结果,Netty的异步模型是建立在future和callback之上的。callback就是回调。重点说Future,它的核心思想是:假设一个方法run,计算过程非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法run的处理过程(即:Future-Listener机制)
Future和Future-Listener 1、Future 表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,ChannelFuture是子接口,ChannelFuture可以添加监听器,当监听的事件发生时,就会通知到监听器 当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。 常用的方法:
sync方法:阻塞等待程序执行结果返回
isDone:判断当前操作是否完成
isSuccess:判断已经完成的当前操作是否成功
getCause:获取已经完成的当前操作失败原因
isCancelled:判断已完成的当前操作是否被取消
addListener:注册监听器,当操作已经完成(isDone方法返回完成),将会通知指定的监听器
2、Future-Lisener机制 给Future添加监听器,监听操作结果 代码实现:
ChannelFuture future = serverBootstrap.bind(9999 ).sync(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture channelFuture) throws Exception { if (channelFuture.isSuccess()){ System.out.println("端口绑定成功" ); }else { System.out.println("端口绑定失败" ); } } });
Netty编解码器 java编解码器 1、编码(Encode)称为序列化,它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途 2、解码(Decode)称为反序列化,它把网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作 java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过java.io.ObjectInput和java.io.ObjectOutput序列化和反序列化 java序列化的目的:1、网络传输,2、对象持久化 java序列化缺点:1、无法跨语言。2、序列化后码流太大。3、序列化性能太低 java序列化仅仅是java编解码技术的一种,由于它的种种缺陷,衍生出多种编解码技术和框架,这些编解码技术框架实现消息的高效序列化
Netty编解码器 1、概念 在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行相互转换。网络中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进行解码 对Netty来言,编辑骂起由两部分组成:编码器、解码器
解码器:负责将消息从字节或者其他序列化形式转成指定的消息对象
编码器:将消息对象转成字节或者其他序列形式在网络上传输 Netty的编解码器实现了ChannelHandlerAdapter,也是一种特殊的ChannelHandler,所以依赖于ChannelPipeline,可以将多个编解码器链接在一起,以实现复杂的转换逻辑
Netty里面的编解码:编码器:负责“出站OutboundHandler”数据。解码器“入站InboundHandler”数据*
解码器(Decode) 解码器负责解码“入站”数据从一种格式到另一种格式,解码器处理入站数据最抽象ChannelInboundHandler的实现。需要将解码器放在ChannelPipeline中,对于解码器,Netty中主要提供了抽象基类ByteToMessageDecoder和MessageToMessageDecoder
抽象解码器:
ByteToMessageDecoder:用于将字节转为消息,需要检查缓冲区是否有足够的字节
ReplayingDecoder:继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但是RepalyingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持,项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder
MessageToMessageDecoder:用于从一种消息解码为另一种消息(例如POJO到POJO)
核心方法:
decode(ChannelHandlerContext ctx,ByteBuf msg,List out)
代码实现:
public class MessageDecoder extends MessageToMessageDecoder { @Override protected void decode (ChannelHandlerContext channelHandlerContext, Object o, List list) throws Exception { System.out.println("正在进行消息解码。。。" ); ByteBuf byteBuf = (ByteBuf) o; list.add(byteBuf.toString(CharsetUtil.UTF_8)); } } .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel channel) throws Exception { channel.pipeline().addLast("messageDecoder" ,new MessageDecoder()); channel.pipeline().addLast(new NettyServerHandler()); } });
编码器(Encoder) 与ByteToMessageDecoder和MessageToMessageDecoder相对应,Netty提供了对应的编码器实现MessageToByteEncoder和MessageToMessageEncoder,二者都是实现ChannelOutboundHandler接口
抽象编码器:
MessageToByteEncoder:将消息转换为字节
MessageToMessageEncoder:用于从一种消息编码为另一种消息(例如POJO到POJO)
代码实现:
public class MessageEncoder extends MessageToMessageEncoder { @Override protected void encode (ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("消息正在进行编码...." ); String str = (String)msg; out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8)); } } .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel (SocketChannel channel) throws Exception { channel.pipeline().addLast("messageDecoder" ,new MessageDecoder()); channel.pipeline().addLast("messageEncoder" ,new MessageEncoder()); channel.pipeline().addLast(new NettyServerHandler()); } });
编解码器(Codec) 编解码器:同时具有编码与解码的功能,特点同时实现了ChannelInboundHandler和ChannelOutboundHandler接口,因此在数据输入和输出时都能进行处理
Netty提供了一个CHannelDuplexHandler适配器类,编码解码类的抽象基类ByteToMessageCodec和MessageToMessageCodec都继承此类。 代码实现:
public class MessageCodec extends MessageToMessageCodec { @Override protected void encode (ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("消息正在进行编码...." ); String str = (String)msg; out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8)); } @Override protected void decode (ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("正在进行消息解码。。。" ); ByteBuf byteBuf = (ByteBuf) msg; out.add(byteBuf.toString(CharsetUtil.UTF_8)); } } channel.pipeline().addLast(new MessageCodec());
Netty案例-群聊天室 案例需求:
编写一个Netty群聊系统,实现服务器和客户端之间的数据简单通讯
实现多人聊天
服务器端:可以检测用户上线,离线,并实现消息的转发功能
客户端:可以发送消息给其他所有用户,同时可以接受其他用户发送的消息。
聊天室服务端编写 netty中每次有新的请求传入的时候,都会创建新的Handler来处理请求,如果想要共用一个需要在自定义Handler中加入注解@ChannelHandler.Sharable
public class NettyChatServer { private int port ; public NettyChatServer (int port) { this .port = port; } public void run () throws InterruptedException { EventLoopGroup bossGroup = null ; EventLoopGroup workerGroup = null ; try { bossGroup = new NioEventLoopGroup(1 ); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class ) .option (ChannelOption .SO_BACKLOG ,128) .childOption (ChannelOption .SO_KEEPALIVE ,Boolean .TRUE ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); ch.pipeline().addLast(new NettyChatServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(port); channelFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()){ System.out.println("聊天室服务端启动成功" ); }else { System.out.println("聊天室服务端启动失败" ); } }); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main (String[] args) throws InterruptedException { new NettyChatServer(9999 ).run(); } }
编写业务处理Handler
public class NettyChatServerHandler extends SimpleChannelInboundHandler <String > { public static List<Channel> channels = new ArrayList<>(); @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channels.add(channel); System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1 )+"在线。" ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channels.remove(channel); System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1 )+"下线。" ); } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { Channel channel = ctx.channel(); channels.remove(channel); cause.printStackTrace(); System.out.println("[Server]:" +channel.remoteAddress().toString().substring(1 )+"异常。" ); } @Override protected void channelRead0 (ChannelHandlerContext ctx, String msg) throws Exception { Channel channel = ctx.channel(); for (Channel channel1 : channels) { if (channel != channel1){ channel1.writeAndFlush("[" +channel.remoteAddress().toString().substring(1 )+"]说:" +msg); } } } }
现在聊天室的服务端编写完毕。
聊天室客户端编写 编写handler,职责就是打印出来信息即可
public class NettyChatClientHandler extends SimpleChannelInboundHandler <String > { @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, String s) throws Exception { System.out.println(s); } }
编写主启动类
public class NettyChatClient { private String ip; private int port; public NettyChatClient (String ip, int port) { this .ip = ip; this .port = port; } public void run () throws InterruptedException { EventLoopGroup group = null ; try { group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap() .group(group) .channel(NioSocketChannel.class ) .handler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringDecoder()); socketChannel.pipeline().addLast(new StringEncoder()); socketChannel.pipeline().addLast(new NettyChatClientHandler()); } }); ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); Channel channel = channelFuture.channel(); System.out.println("--------------" +channel.localAddress().toString().substring(1 )+"-------------" ); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()){ String line = scanner.nextLine(); channel.writeAndFlush(line); } channelFuture.channel().closeFuture().sync(); }finally { group.shutdownGracefully(); } } public static void main (String[] args) throws InterruptedException { new NettyChatClient("127.0.0.1" ,9999 ).run(); } }
运行多个客户端与一个服务端,就可以实现群聊功能。
基于Netty的Http服务器开发 介绍 Netty的Http协议栈无论在性能上还是可靠性上,都表现优异,非常适合在非web容器的场景下应用,相比于传统的tomcat、jetty等web容器,它更加轻量和小巧,灵活性和定制性也更好。
功能需求
服务端代码实现 NettyHttpServer
代码与上述几个的代码类似,重点区别就是编码器的区别,这里用到了HttpServerCodec
的编码器
public class NettyHttpServer { private int port ; public NettyHttpServer (int port) { this .port = port; } public void run () throws InterruptedException { EventLoopGroup bossGroup = null ; EventLoopGroup workerGroup = null ; try { bossGroup = new NioEventLoopGroup(1 ); workerGroup = new NioEventLoopGroup(); ServerBootstrap serverBootstrap = new ServerBootstrap() .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class ) .option (ChannelOption .SO_BACKLOG ,128) .childOption (ChannelOption .SO_KEEPALIVE ,Boolean .TRUE ) .childHandler (new ChannelInitializer <SocketChannel >() { @Override protected void initChannel (SocketChannel ch) throws Exception { ch.pipeline().addLast(new HttpServerCodec()); ch.pipeline().addLast(new NettyHttpServerHandler()); } }); ChannelFuture channelFuture = serverBootstrap.bind(port); channelFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()){ System.out.println("Http服务端启动成功" ); }else { System.out.println("Http服务端启动失败" ); } }); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main (String[] args) throws InterruptedException { new NettyHttpServer(8080 ).run(); } }
编写NettyHttpServerHandler
类
public class NettyHttpServerHandler extends SimpleChannelInboundHandler <HttpObject > { @Override protected void channelRead0 (ChannelHandlerContext channelHandlerContext, HttpObject httpObject) throws Exception { if (httpObject instanceof HttpRequest){ DefaultHttpRequest request = (DefaultHttpRequest) httpObject; System.out.println("浏览器请求路径:" +request.uri()); if ("/favicon.ico" .equals(request.uri())){ System.out.println("图标不响应" ); return ; } ByteBuf byteBuf = Unpooled.copiedBuffer("Hello!我是Netty服务器" , CharsetUtil.UTF_8); DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,HttpResponseStatus.OK,byteBuf); response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/html;charset=utf-8" ); response.headers().set(HttpHeaderNames.CONTENT_LENGTH,byteBuf.readableBytes()); channelHandlerContext.writeAndFlush(response); } } }
编写完成后,在页面启动,输入”http://localhost:8080",页面会返回对应的信息。
基于Netty的WebSocket开发网页版聊天室 WebSocket介绍 WebSocket是一种在单个TCP连接上进行的全双工通信的协议。WebSocket使得客户端和服务器之间交换变得更加简单,允许服务器主动向客户端发送消息数据。在WebSocket API中,客户端和服务端只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。 应用场景:
社交订阅
协同编辑/编程
股票基金报价
体育实况更新
多媒体聊天
在线教育
WebSocket与Http的区别 Http协议是在应用层的协议,它是基于tcp协议的,http协议建立连接也必须要有三次握手才能发消息。http连接分为短链接和长连接,短连接时每次请求都要三次握手才能发送自己的消息,即每一个request对应一个response,长连接时在一定期限内保持连接,保持TCP连接不断开,客户端与服务端通信,必须要有客户端先发起,然后服务端返回结果,客户端时主动的,服务端是被动的,客户端要想实时获取服务端消息就得不断发送长连接到服务端。 Websocket实现了多路复用,它是全双工的。在WebSocket协议下服务端和客户端可以同时发送信息,建立了WebSocket连接之后,服务端可以主动发送信息到客户端,而且信息当中不必带有heand的部分信息了,与http的长连接相比,这种方式,不仅能降低服务器压力,而且信息当中也减少了部分多余的信息
环境搭建以及关键编码实现 因为是webSocket,为了方便我们引入SpringBoot用来编写除WebSocket以外的web服务器端功能,包括http请求、以及静态页面的资源请求等 NettyWebSocketServer,Netty服务的主启动类
@Component public class NettyWebSocketServer implements Runnable { @Autowired NettyConfig nettyConfig; @Autowired WebSocketChannelInit webSocketChannelInit; private EventLoopGroup bossGroup = new NioEventLoopGroup(1 ); private EventLoopGroup workerGroup = new NioEventLoopGroup(); @PreDestroy public void close () { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run () { try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class ) // 设置一个日志处理器 .handler (new LoggingHandler (LogLevel .DEBUG )) .childHandler (webSocketChannelInit ) ; ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync(); System.out.println("--Netty服务端启动成功---" ); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
我们还需要编写一个WebSocketChannelInit,用于初始化管道
@Component public class WebSocketChannelInit extends ChannelInitializer { @Autowired NettyConfig nettyConfig; @Autowired WebSocketHandler webSocketHandler; @Override protected void initChannel (Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new HttpServerCodec()); pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(8000 )); pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath())); pipeline.addLast(webSocketHandler); } }
编写WebSocketHandler 真正的业务处理类
@Component @ChannelHandler .Sharable public class WebSocketHandler extends SimpleChannelInboundHandler <TextWebSocketFrame > { public static List<Channel> channelList = new ArrayList<>(); @Override public void channelActive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelList.add(channel); System.out.println("有新的连接." ); } @Override public void channelInactive (ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); channelList.remove(channel); } @Override protected void channelRead0 (ChannelHandlerContext ctx, TextWebSocketFrame textWebSocketFrame) throws Exception { String msg = textWebSocketFrame.text(); System.out.println("msg:" + msg); Channel channel = ctx.channel(); for (Channel channel1 : channelList) { if (channel != channel1) { channel1.writeAndFlush(new TextWebSocketFrame(msg)); } } } @Override public void exceptionCaught (ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); channelList.remove(channel); } }
编写完成这三个类后,需要加入到Spring中并且启动,那么就需要编写一个类实现CommandLineRunner
接口中的run方法,该方法是在Spring启动完成后调用的
public class NettySpringbootApplication implements CommandLineRunner { @Autowired NettyWebSocketServer nettyWebSocketServer; @Override public void run (String... args) throws Exception { new Thread(nettyWebSocketServer).start(); } }
编写客户端的javascript访问程序
var ws = new WebSocket("ws://localhost:8081/chat" ); ws.onopen = function ( ) { console .log("连接成功." ) } ws.onmessage = function (evt ) { showMessage(evt.data); } ws.onclose = function ( ) { console .log("连接关闭" ) } ws.onerror = function ( ) { console .log("连接异常" ) }
这样我们的基于Netty的webSocket就完成了。
Netty中粘包和拆包的解决方案 粘包和拆包简介 粘包和拆包是TCP网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息的时候,都需要考虑TCP底层的粘包/拆包机制 TCP是个”流”协议,所谓流,就是没有界限的一串数字。TCP底层并不了解上层业务数据的具体含义,它会根据TCP缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被TCP拆分成多个包进行发送,也有可能把多个小的包封装成一个打的数据包发送,这就是所谓的TCP粘包与拆包问题 如下图所示,假设客户端分别发送两个数据包D1和D2给服务端,由于服务端一次读取到 的字节数不确定的,故看了能存在下面四种情况:
TCP粘包和拆包产生的原因: 数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就是在这个缓冲区上,粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理
粘包拆包的解决方案 1、业内解决方案 由于底层的TCP无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下:
消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到一个完整的信息
将换行符作为消息结束符
将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符
通过在消息头定义长度字段来标识消息的总长度
2、Netty中的粘包和拆包解决方案,Netty提供了4种编码器来解决
固定长度的拆包器FixedLengthFrameDecoder
,每个应用层数据包的都拆分都是固定长度的大小
行拆分器LineBasedFrameDecoder
,每个应用层数据包,都以换行符作为分割符,进行分割拆分
分隔符拆包器DelimiterBasedFrameDecoder
,每个应用层数据包,都通过自定义的分割符,进行分割拆分
基于数据包长度的拆包器LengthFieldBasedFrameDecoder
,将应用层数据包的长度,作为接收端应用层数据包的拆分依据,按照应用层数据包的大小拆包。这个拆包器,有一个要求,就是应用层协议种包含数据包的长度
3、代码实现 只需要添加相应的解码器即可
LineBasedFrameDecoder:以换行符为每一次发送语句的结束,使用时需要在每一次发送完成的结尾增加换行符\n..childHandler((ChannelInitializer)(ch)->{ ch.pipeline().addLast(new LineBasedFrameDecoder(2048 )); })
DelimiterBasedFrameDecoder: 设置特殊分隔符用于防止粘包.childHandler((ChannelInitializer)(ch)->{ ByteBuf bytebuf = Unpooled.copiedBuffer("$" .getByte(StandardCharsets.UTF_8)) ch.pipeline().addLast(new DelimiterBasedFrameDecoder(2048 ,bytebuf)); })
对于拆包的解决办法是一样的。同样也是使用,需要在服务端设置最大包的字节数,并且需要用换行符进行分割或者特定字符进行分割。如果客户端超出了服务端设置的最大限制则会报错。