本文共 9514 字,大约阅读时间需要 31 分钟。
netty基于NIO对其进行了一系列的封装改造。
这里主要介绍基于服务端的netty开发,客户端其实跟服务端差不多,当然客户端也可以直接使用socket与netty服务端通讯。
在netty开发中,我觉得比较关键的对象有:
1、EventLoopGroup
对于NIO来讲,其实现类就是NioEventLoopGroup。这个就相当于线程池,用来处理事件。
对于服务端,需要有两个NioEventLoopGroup对象,一个是parentGroup,用来处理accept,称之为acceptor;另一个是childGroup,用来处理client事件,称之为client。
parentGroup和childGroup两者的区别可以理解为,parentGroup这个线程池的线程用来处理自身通道的事件,childGroup这个线程池用来处理与客户端交互的事件。
对于客户端来讲,只需要建一个NioEventLoopGroup对象即可。
2、ServerBootstrap / Bootstrap
是netty的引导类,用来引导注册EventLoopGroup,注册channel类型,注册回调Handler,建立连接或绑定端口等
对于服务端来讲:
ServerBootstrap b = new ServerBootstrap();ChannelFuture f = b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new ServerChannelInitializer()).bind(9999);
对于客户端来讲:
Bootstrap b = new Bootstrap();ChannelFuture f = b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInit()).connect("127.0.0.1",9999);
3、ChannelFuture
它是Channel异步IO操作的结果。它实现的是Future接口。因为是异步,ChannelFuture提供了sync()方法,以保证ChannelFuture对象的初始化。异步处理的结果都会封装到future对象里。
4、ChannelInitializer
这个是childHandler()或handler()方法的参数类型,是一个抽象类。其类图如下:
第2步中的ServerChannelInitializer和ClientChannelInit,都是集成自ChannelInitializer。这是channel初始化的关键类。
具体需要重写initChannel(Channel ch)方法,实现方式如下:
class ServerChannelInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandlerAdapter()); }}
在方法中可以拿到初试化的channel对象,使用channel对象的pipeline()方法可以获取ChannelPipeline对象,这个对象就相当于是channel里面的事件链条(双向链表),它上面可以添加一系列的ChannelHandler,就相当于一些事件处理器(责任链模式)。这些处理器可以分为I/O两种。源码解释pipeline如下图
I/O Request* via {@link Channel} or* {@link ChannelHandlerContext}* |* +---------------------------------------------------+---------------+* | ChannelPipeline | |* | \|/ |* | +---------------------+ +-----------+----------+ |* | | Inbound Handler N | | Outbound Handler 1 | |* | +----------+----------+ +-----------+----------+ |* | /|\ | |* | | \|/ |* | +----------+----------+ +-----------+----------+ |* | | Inbound Handler N-1 | | Outbound Handler 2 | |* | +----------+----------+ +-----------+----------+ |* | /|\ . |* | . . |* | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|* | [ method call] [method call] |* | . . |* | . \|/ |* | +----------+----------+ +-----------+----------+ |* | | Inbound Handler 2 | | Outbound Handler M-1 | |* | +----------+----------+ +-----------+----------+ |* | /|\ | |* | | \|/ |* | +----------+----------+ +-----------+----------+ |* | | Inbound Handler 1 | | Outbound Handler M | |* | +----------+----------+ +-----------+----------+ |* | /|\ | |* +---------------+-----------------------------------+---------------+* | \|/* +---------------+-----------------------------------+---------------+* | | | |* | [ Socket.read() ] [ Socket.write() ] |* | |* | Netty Internal I/O Threads (Transport Implementation) |* +-------------------------------------------------------------------+
所以对于NIO来讲,所有处理消息的方法或者事件,都是放在pipeline里面。
5、ChannelHandler
这个就是具体处理事件的接口,他用来处理I/O事件或拦截I/O操作,并将其转发给它的下一个pipeline里的处理程序。其常用的实现类是ChannelInboundHandlerAdapter。其提供了两个方法 channelActive(ChannelHandlerContext ctx) 和 channelRead(ChannelHandlerContext ctx, Object msg),分别是用来处理通道激活时的处理以及接收到数据时的处理。msg就是接收到的信息,但是需要转化为ByteBuf对象。
6、ByteBuf
是对NIO的ByteBuffer对象的封装。netty通讯的内容都会以ByteBuf形式接收或发送。
贴一下客户端及服务端的完整代码,基于此可以做个聊天室了。
package com.iotest.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.util.concurrent.GlobalEventExecutor;public class Server { public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); public static void main(String[] args) { EventLoopGroup parentGroup = new NioEventLoopGroup(1); EventLoopGroup childGroup = new NioEventLoopGroup(2); ServerBootstrap b = new ServerBootstrap(); try { ChannelFuture f = b.group(parentGroup, childGroup) .channel(NioServerSocketChannel.class) .childHandler(new ServerChannelInitializer()) .bind(8888) .sync(); System.out.println("服务端已经启动"); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { childGroup.shutdownGracefully(); parentGroup.shutdownGracefully(); } }}class ServerChannelInitializer extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandlerAdapter()); }}class ServerHandlerAdapter extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Server.clients.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buff = (ByteBuf)msg; byte[] bytes = new byte[buff.readableBytes()]; buff.getBytes(buff.readerIndex(), bytes);// buff.readBytes(bytes); System.out.println(new String(bytes)); Server.clients.writeAndFlush(msg); System.out.println("服务器转发成功"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
package com.iotest.netty;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.ReferenceCountUtil;public class Client { private static Channel channel; public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); try { ChannelFuture f = b.group(group) .channel(NioSocketChannel.class) .handler(new ClientChannelInit()) .connect("127.0.0.1", 8888); f.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if(future.isSuccess()){ System.out.println("客户端链接成功"); channel = future.channel();// sendMsg("hello server"); }else{ System.out.println("客户端链接失败"); } } }); f.sync(); f.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } public static void sendMsg(String str){ ByteBuf buf = Unpooled.copiedBuffer(str.getBytes()); channel.writeAndFlush(buf); System.out.println("给服务器发送消息:"+str); }}class ClientChannelInit extends ChannelInitializer{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientChannelHandler()); }}class ClientChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("接收到服务器消息:"); ByteBuf buf = null; try { buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.getBytes(buf.readerIndex(), bytes); System.out.println(new String(bytes)); } finally { if(buf != null) ReferenceCountUtil.release(buf); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes()); ctx.writeAndFlush(buf); }}
转载地址:http://bhcdi.baihongyu.com/