Netty 入门
概述
什么是 Netty?
- 异步:用的多线程,不是异步 IO
- 基于事件驱动:表示用的 Selector
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
Netty 的地位
Netty 在 Java 网络应用框架中的地位就好比:Spring 框架在 JavaEE 开发中的地位
以下的框架都使用了 Netty,因为它们有网络通信需求!
- Cassandra - nosql 数据库
- Spark - 大数据分布式计算框架
- Hadoop - 大数据分布式存储框架
- RocketMQ - ali 开源的消息队列
- ElasticSearch - 搜索引擎
- gRPC - rpc 框架
- Dubbo - rpc 框架
- Spring 5.x - flux api 完全抛弃了 tomcat ,使用 netty 作为服务器端
- Zookeeper - 分布式协调框架
Netty 的优势
Netty vs NIO,工作量大,bug 多
- 需要自己构建协议
- 解决 TCP 传输问题,如粘包、半包
- epoll 空轮询导致 CPU 100%
- 对 API 进行增强,使之更易用,如 FastThreadLocal => ThreadLocal,ByteBuf => ByteBuffer
Netty vs 其它网络应用框架
- Mina 由 apache 维护,将来 3.x 版本可能会有较大重构,破坏 API 向下兼容性,Netty 的开发迭代更迅速,API 更简洁、文档更优秀
创建服务器 / 客户端
服务端
// 1. 启动器,负责组装 netty组件,启动服务器
new ServerBootstrap()
// BossEventLoop,WorkerEventLoop(selector, thread)
// 创建一个事件循环线程组,适用于 NIO 实现,可以简单理解为 `线程池 + Selector`
.group(new NioEventLoopGroup())
// 指定要使用的服务器 Channel 实现,适用于 NIO 实现
.channel(NioServerSocketChannel.class) // OIO BIO
// BOSS 负责处理连接Work(child)负责处理读写,决定了work(child)能执行哪些(handler)
// 配置服务器 Channel 处理器,即 ChannelPipeline 中的一组 ChannelHandler
.childHandler(
// 5. channel代表客户端进行数据读写的通道Initializer初始化,负责添加到别的 handler
new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 添加具体的 handler
// 将 bytebuf 转换为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 自定义handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override// 读事件
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 打印转好的字符串
System.out.println(msg);
}
});
}
})
// 监听端口
.bind(8080);
客户端
// 启动类
new Bootstrap()
// 添加 EventLoop
.group(new NioEventLoopGroup())
// 选择客户端 channel 实现
.channel(NioSocketChannel.class)
// 添加处理器
.handler(new ChannelInitializer() {
@Override // 在建立连接后被调用
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 把字符串编码成字节
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// 连接服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel()
// 向服务器发送数据
.writeAndFlush("hello, world");
流程分析
channel数据传输通道,可读出来可写进。和nio概念一致
handel中的message流动数据,handel 是一个工序,对原始数据进行一道道工序进行处理
pipeline就是流水线,一道工序,进行添加工序。
handel 分为 inbound和outbound
入站和出站 读入就走入站,写出就走出站
eventloop就是线程,相当于工人
一旦某一个工人负责一个事件,那就会负责到底。一个工人是可以管理多个事件的
eventloop既可以执行io操作,也可以进行普通任务,每个工人都有自己的任务队列,依次处理。底层用的单线程的线程池。
任务可以是定时任务也可以是普通任务
组件
EventLoop
EventLoop是事件循环对象
本质是一个单线程执行器,同时维护了Selector,里面有run方法处理 channel 上源源不断的 io 事件。
继承关系:
- 一条线是继承自 java.util.concurrent.ScheduledExecutorService
- 因此包含了线程池的所有方法
- 另一条是继承自 netty 自己的 OrderedEventExecutor
- 提供了 Boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop
- 提供了 parent方法来看看自己属于哪个 EventLoopGroup
// eventloop
public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
EventLoopGroup parent();
}
// 继承线程池接口
public interface EventExecutorGroup extends ScheduledExecutorService, Iterable {
EventLoopGroup 事件循环组,我们一般使用这个
EventLoopGroup 是一组 EventLoop,channel 一般会调用 EventLoopGroup 的 register 方法俩绑定其中一个 EventLoop,后续这个 channel 上的 io 事件都由同一个 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自 netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
我们平常使用的是下面两个循环组实现类
// 可以实现普通任务,io事件,定时任务
EventLoopGroup nioGroup = new NioEventLoopGroup();
// 普通任务和定时任务
EventLoopGroup defaultGroup = new DefaultEventLoopGroup();
那么我们空参创建对象,线程数是多少呢
下面这段代码,是用来初始化 EventLoopGroup对象的。
如果没有指定线程数,会采用默认的,也就是当前系统的CPU核心数 * 2
/**
* 构造函数,创建MultithreadEventLoopGroup对象
* @param nThreads 表示EventLoopGroup中EventLoop的数量,如果为0则使用默认的线程数
* @param executor 用于执行任务的Executor对象
* @param args 可选参数列表
*/
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// 调用父类的构造函数,初始化EventLoopGroup对象
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
获取当前CPU核数--我的是 16核
NettyRuntime.availableProcessors();
指定线程数量
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
获取下一个线程
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 获取下一个线程
System.out.println(nioGroup.next()); // 第一次打印第一个
System.out.println(nioGroup.next()); // 打印第二个
System.out.println(nioGroup.next()); // 打印第一个,因为一共就两个
System.out.println(nioGroup.next()); // 打印第二个
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
// io.netty.channel.nio.NioEventLoop@737996a0
// io.netty.channel.nio.NioEventLoop@61dc03ce
执行普通任务
- 这里的execute和submit都是一样的效果
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行普通任务
nioGroup.next().execute(() -> {
log.debug("ok");
});
log.debug("main");
定时任务
// 创建两个线程的事件循环组
EventLoopGroup nioGroup = new NioEventLoopGroup(2);
// 执行定时任务 以一定的频率执行
// 参数1:执行的方法
// 参数2:表示第一次启动多久开始
// 参数3:表示间隔多久再次触发
// 参数4:单位
nioGroup.next().scheduleAtFixedRate(() -> {
log.debug("ok");
}, 3, 5, TimeUnit.SECONDS);
log.debug("main");
IO 事件
下面是服务端,客户端同上面。
一个客户端绑定一个服务的 EventLoop线程。
下面再编写客户端的时候,如果要打上断点实现阻塞,需要将idea的断点卡成单线程的
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
我们来分工细化一下
我们可以把 eventloop 划分为 boss 和 work 将accept 和 read 分开处理
那 第一个事件循环组 线程是否可以设置为 1 呢
因为服务器这一有个,它也只会和里面一个 eventloop 进行绑定
new ServerBootstrap()
// 将 参数1:只处理accept 参数2:处理read
.group(new NioEventLoopGroup(), new NioEventLoopGroup())
继续分工细化--如果其中一个Nio线程执行中
在读操作时执行太久,会影响其他 channel读操作
最好不要让它占用 work nio线程,所以我们继续细分
// 细分2:创建独立的 EventLoopGroup
DefaultEventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
.group(new NioEventLoopGroup(), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline()
.addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
// 让消息传递给下一个handler
ctx.fireChannelRead(msg);
}
})
// 指定事件循环组和名称
.addLast(group, "handler2", new ChannelInboundHandlerAdapter() {
@Override
// 如果我们没有用 nio的处理字节方法,这个msg是 bytebuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
// 这个实际开发是要指定字符类型的,不要默认
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
那么是怎么实现切换事件循环组,也就是换人处理的呢?
如果两个 handler 绑定的是同一个线程,那么直接调用,否则调用的代码封装为一个任务对象。由一下一个 handler的线程调用
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 下一个 handler 的事件循环是否与当前的事件循环是同一个线程
/**
* EventExecutor 是事件循环组
*/
EventExecutor executor = next.executor();
// 当前 handler 中的线程是否和 eventloop 是同一个线程
// 是,直接调用
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
}
// 不是,将要执行的代码作为任务提交给下一个事件循环处理(换人)
else {
// 使用runnable 执行一个事件
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
Channel & 连接 & 关闭问题 & 异步
channel 的主要作用
- close() 可以用来关闭 channel
- closeFuture() 用来处理 channel 的关闭
- sync 方法作用是同步等待 channel 关闭
- 而 addListener 方法是异步等待 channel 关闭
- pipeline() 方法添加处理器
- write() 方法将数据写入,只是写进入缓冲区
- writeAndFlush() 方法将数据写入并刷出
connect 连接问题
connect:是异步阻塞,main方法发起,执行是Nio的线程
所以如果在发送数据前,一定要保证已经连接好了,否则数据是发送不出去的
ChannelFuture:带有Future或者 promise都是和异步配套使用,用来处理结果
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
nioSocketChannel.pipeline().addLast(new StringEncoder());
}
})
// connect 是异步阻塞,main方法发起调用,真正执行连接的是Nio线程
.connect(new InetSocketAddress("localhost", 8080));
// channelFuture.sync();
// 会无阻塞向下执行获取channel,最后发送数据
Channel channel = channelFuture.channel();
// log.debug("{}", channel); // 如果没有获取到打印的就是没有连接的 channel,所以需要执行 sync
channel.writeAndFlush("123");
解决连接问题,保证发送数据前一定是正确连接的
// 阻塞当前线程,直到连接建立完毕
channelFuture.sync();
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
- addListener(回调对象)方法异步处理结果
channelFuture.addListener(new ChannelFutureListener() {
@Override
// 在nio 线程连接建立好之后,会调用operationComplete
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
log.debug("{}", channel);
channel.writeAndFlush("123");
}
});
关闭问题
我们看下面这段代码
我们可以输入内容,发送给服务端,按 q 关闭传输通道
但是我们关闭的时候,想做一些操作,怎么整
直接在close下面写吗,那是不对的,因为close方法是异步的
channelFuture.sync();
Channel channel = channelFuture.channel();
new Thread(new Runnable() {
@Override
public void run() {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}
}, "input").start();
解决异步问题
添加日志调试
nioSocketChannel.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
// 添加关闭处理器
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();// 设置同步
log.debug("处理关闭之后....");
new Thread(() -> {
Scanner sc = new Scanner(System.in);
while (true) {
String line = sc.nextLine();
if ("q".equals(line)) {
channel.close();
break;
}
channel.writeAndFlush(line);
}
}, "input").start();
ChannelFuture closeFuture = channel.closeFuture();
// 设置关闭回调函数
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
log.debug("处理关闭之后....");
}
});
优雅的关闭
- 它会等待所有正在处理的任务完成后,再进行关闭。
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
log.debug("处理关闭之后的操作");
group.shutdownGracefully(); // 优雅的关闭
}
});
异步
异步:就是一个线程发起连接一个线程去建立连接
思考下面的场景,4 个医生给人看病,每个病人花费 20 分钟,而且医生看病的过程中是以病人为单位的,一个病人看完了,才能看下一个病人。假设病人源源不断地来,可以计算一下 4 个医生一天工作 8 小时,处理的病人总数是:4 * 8 * 3 = 96
个病人
那我们就可以细分一下,四个医生分别处理四个事情
只有一开始,医生 2、3、4 分别要等待 5、10、15 分钟才能执行工作,但只要后续病人源源不断地来,他们就能够满负荷工作,并且处理病人的能力提高到了 4 * 8 * 12
效率几乎是原来的四倍
重点:
单线程没法异步提高效率,必须配合多线程、多核CPU才能发挥异步的优势 异步并没有缩短响应时间,反而有所增加 合理进行任务拆分,也是利用异步的关键
Future & Promise
处理异步时:经常用到的两个接口, futer 核 promise
首先要说明 netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口,netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future:只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future:可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise;不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 | jdk Future | netty Future | Promise |
---|---|---|---|
cancel | 取消任务 | - | - |
isCanceled | 任务是否取消 | - | - |
isDone | 任务是否完成,不能区分成功失败 | - | - |
get | 获取任务结果,阻塞等待 | - | - |
getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
isSuccess | - | 判断任务是否成功 | - |
cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
addLinstener | - | 添加回调,异步接收结果 | - |
setSuccess | - | - | 设置成功结果 |
setFailure | - | - | 设置失败结果 |
jdk future
// 线程池
ExecutorService pool = Executors.newFixedThreadPool(2);
// 提交任务
Future future = pool.submit(new Callable() {
@Override
public Integer call() throws Exception {
log.debug("执行计算"); // poll 线程
Thread.sleep(1000);
return 50;
}
});
// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程
nio future
// 创建事件循环组
NioEventLoopGroup eventGroup = new NioEventLoopGroup();
// 获取执行事件
EventLoop eventLoop = eventGroup.next();
// 执行方法
Future future = eventLoop.submit(new Callable() {
@Override
public Integer call() throws Exception {
log.debug("执行计算"); // nio 线程
Thread.sleep(1000);
return 50;
}
});
同步获取线程返回结果
// 主线程通过 future 获取结果
log.debug("等待结果"); // 主线程
log.debug("结果是 {}", future.get()); // 主线程
异步
future.addListener(new GenericFutureListener