博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java netty
阅读量:4038 次
发布时间:2019-05-24

本文共 9514 字,大约阅读时间需要 31 分钟。

netty基于NIO对其进行了一系列的封装改造。

这里主要介绍基于服务端的netty开发,客户端其实跟服务端差不多,当然客户端也可以直接使用socket与netty服务端通讯。

在netty开发中,我觉得比较关键的对象有:

1、EventLoopGroup 

对于NIO来讲,其实现类就是NioEventLoopGroup。这个就相当于线程池,用来处理事件。

对于服务端,需要有两个NioEventLoopGroup对象,一个是parentGroup,用来处理accept,称之为acceptor;另一个是childGroup,用来处理client事件,称之为client。

parentGroup和childGroup两者的区别可以理解为,parentGroup这个线程池的线程用来处理自身通道的事件,childGroup这个线程池用来处理与客户端交互的事件。

对于客户端来讲,只需要建一个NioEventLoopGroup对象即可。

 

2、ServerBootstrap / Bootstrap

是netty的引导类,用来引导注册EventLoopGroup,注册channel类型,注册回调Handler,建立连接或绑定端口等

对于服务端来讲:

ServerBootstrap b = new ServerBootstrap();ChannelFuture f = b.group(parentGroup, childGroup).channel(NioServerSocketChannel.class).childHandler(new ServerChannelInitializer()).bind(9999);

对于客户端来讲:

Bootstrap b = new Bootstrap();ChannelFuture f = b.group(group).channel(NioSocketChannel.class).handler(new ClientChannelInit()).connect("127.0.0.1",9999);

3、ChannelFuture

它是Channel异步IO操作的结果。它实现的是Future接口。因为是异步,ChannelFuture提供了sync()方法,以保证ChannelFuture对象的初始化。异步处理的结果都会封装到future对象里。

 

4、ChannelInitializer

这个是childHandler()或handler()方法的参数类型,是一个抽象类。其类图如下:

第2步中的ServerChannelInitializer和ClientChannelInit,都是集成自ChannelInitializer。这是channel初始化的关键类。

具体需要重写initChannel(Channel ch)方法,实现方式如下:

class ServerChannelInitializer extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandlerAdapter()); }}

在方法中可以拿到初试化的channel对象,使用channel对象的pipeline()方法可以获取ChannelPipeline对象,这个对象就相当于是channel里面的事件链条(双向链表),它上面可以添加一系列的ChannelHandler,就相当于一些事件处理器(责任链模式)。这些处理器可以分为I/O两种。源码解释pipeline如下图

I/O Request*                                            via {@link Channel} or*                                        {@link ChannelHandlerContext}*                                                      |*  +---------------------------------------------------+---------------+*  |                           ChannelPipeline         |               |*  |                                                  \|/              |*  |    +---------------------+            +-----------+----------+    |*  |    | Inbound Handler  N  |            | Outbound Handler  1  |    |*  |    +----------+----------+            +-----------+----------+    |*  |              /|\                                  |               |*  |               |                                  \|/              |*  |    +----------+----------+            +-----------+----------+    |*  |    | Inbound Handler N-1 |            | Outbound Handler  2  |    |*  |    +----------+----------+            +-----------+----------+    |*  |              /|\                                  .               |*  |               .                                   .               |*  | ChannelHandlerContext.fireIN_EVT() ChannelHandlerContext.OUT_EVT()|*  |        [ method call]                       [method call]         |*  |               .                                   .               |*  |               .                                  \|/              |*  |    +----------+----------+            +-----------+----------+    |*  |    | Inbound Handler  2  |            | Outbound Handler M-1 |    |*  |    +----------+----------+            +-----------+----------+    |*  |              /|\                                  |               |*  |               |                                  \|/              |*  |    +----------+----------+            +-----------+----------+    |*  |    | Inbound Handler  1  |            | Outbound Handler  M  |    |*  |    +----------+----------+            +-----------+----------+    |*  |              /|\                                  |               |*  +---------------+-----------------------------------+---------------+*                  |                                  \|/*  +---------------+-----------------------------------+---------------+*  |               |                                   |               |*  |       [ Socket.read() ]                    [ Socket.write() ]     |*  |                                                                   |*  |  Netty Internal I/O Threads (Transport Implementation)            |*  +-------------------------------------------------------------------+

所以对于NIO来讲,所有处理消息的方法或者事件,都是放在pipeline里面。

 

5、ChannelHandler

这个就是具体处理事件的接口,他用来处理I/O事件或拦截I/O操作,并将其转发给它的下一个pipeline里的处理程序。其常用的实现类是ChannelInboundHandlerAdapter。其提供了两个方法 channelActive(ChannelHandlerContext ctx)  和  channelRead(ChannelHandlerContext ctx, Object msg),分别是用来处理通道激活时的处理以及接收到数据时的处理。msg就是接收到的信息,但是需要转化为ByteBuf对象。

 

6、ByteBuf

是对NIO的ByteBuffer对象的封装。netty通讯的内容都会以ByteBuf形式接收或发送。

 

贴一下客户端及服务端的完整代码,基于此可以做个聊天室了。

package com.iotest.netty;import io.netty.bootstrap.ServerBootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.group.ChannelGroup;import io.netty.channel.group.DefaultChannelGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.util.concurrent.GlobalEventExecutor;public class Server {    public static ChannelGroup clients = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);    public static void main(String[] args) {        EventLoopGroup parentGroup = new NioEventLoopGroup(1);        EventLoopGroup childGroup = new NioEventLoopGroup(2);        ServerBootstrap b = new ServerBootstrap();        try {            ChannelFuture f = b.group(parentGroup, childGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ServerChannelInitializer())                    .bind(8888)                    .sync();            System.out.println("服务端已经启动");            f.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            childGroup.shutdownGracefully();            parentGroup.shutdownGracefully();        }    }}class ServerChannelInitializer extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ServerHandlerAdapter()); }}class ServerHandlerAdapter extends ChannelInboundHandlerAdapter{ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Server.clients.add(ctx.channel()); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buff = (ByteBuf)msg; byte[] bytes = new byte[buff.readableBytes()]; buff.getBytes(buff.readerIndex(), bytes);// buff.readBytes(bytes); System.out.println(new String(bytes)); Server.clients.writeAndFlush(msg); System.out.println("服务器转发成功"); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); }}
package com.iotest.netty;import io.netty.bootstrap.Bootstrap;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.util.ReferenceCountUtil;public class Client {    private static Channel channel;    public static void main(String[] args) {        EventLoopGroup group = new NioEventLoopGroup();        Bootstrap b = new Bootstrap();        try {            ChannelFuture f = b.group(group)                    .channel(NioSocketChannel.class)                    .handler(new ClientChannelInit())                    .connect("127.0.0.1", 8888);            f.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    if(future.isSuccess()){                        System.out.println("客户端链接成功");                        channel = future.channel();//                        sendMsg("hello server");                    }else{                        System.out.println("客户端链接失败");                    }                }            });            f.sync();            f.channel().closeFuture().sync();        } catch (InterruptedException e) {            e.printStackTrace();        } finally {            group.shutdownGracefully();        }    }    public static void sendMsg(String str){        ByteBuf buf = Unpooled.copiedBuffer(str.getBytes());        channel.writeAndFlush(buf);        System.out.println("给服务器发送消息:"+str);    }}class ClientChannelInit extends ChannelInitializer
{ @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ClientChannelHandler()); }}class ClientChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("接收到服务器消息:"); ByteBuf buf = null; try { buf = (ByteBuf) msg; byte[] bytes = new byte[buf.readableBytes()]; buf.getBytes(buf.readerIndex(), bytes); System.out.println(new String(bytes)); } finally { if(buf != null) ReferenceCountUtil.release(buf); } } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ByteBuf buf = Unpooled.copiedBuffer("hello".getBytes()); ctx.writeAndFlush(buf); }}

 

 

 

 

转载地址:http://bhcdi.baihongyu.com/

你可能感兴趣的文章
Android之TelephonyManager类的方法详解
查看>>
android raw读取超过1M文件的方法
查看>>
ubuntu下SVN服务器安装配置
查看>>
MPMoviePlayerViewController和MPMoviePlayerController的使用
查看>>
CocoaPods实践之制作篇
查看>>
[Mac]Mac 操作系统 常见技巧
查看>>
苹果Swift编程语言入门教程【中文版】
查看>>
捕鱼忍者(ninja fishing)之游戏指南+游戏攻略+游戏体验
查看>>
iphone开发基础之objective-c学习
查看>>
iphone开发之SDK研究(待续)
查看>>
计算机网络复习要点
查看>>
Variable property attributes or Modifiers in iOS
查看>>
NSNotificationCenter 用法总结
查看>>
C primer plus 基础总结(一)
查看>>
剑指offer算法题分析与整理(一)
查看>>
剑指offer算法题分析与整理(三)
查看>>
Ubuntu 13.10使用fcitx输入法
查看>>
pidgin-lwqq 安装
查看>>
mint/ubuntu安装搜狗输入法
查看>>
C++动态申请数组和参数传递问题
查看>>