Netty添加线程池实现异步处理
tomcat 异步线程模型⼤概可以理解为:acceptor负责接受新来的连接,然后把连接初始化后丢给poller来做io,然后⼜交给处理业务的exec线程池异步处理业务逻辑。
所以如果IO线程和handler 在⼀个线程⾥⾯,如果handler 执⾏某个逻辑⽐较耗时,⽐如查数据库、服务间通信等会严重影响整个netty的性能。这时候就需要考虑将耗时操作异步处理。
netty 中加⼊线程池有两种⽅式:
第⼀种是handler 中加⼊线程池
第⼆种是Context 中加⼊线程池
1. handler 加⼊线程池
核⼼代码如下:
1. 服务端相关代码
EchoServer
ho;
bootstrap.ServerBootstrap;
channel.*;
channel.nio.NioEventLoopGroup;
channel.socket.SocketChannel;
channel.socket.nio.NioServerSocketChannel;
handler.logging.LogLevel;
handler.logging.LoggingHandler;
handler.ssl.SslContext;
handler.ssl.SslContextBuilder;
handler.ssl.util.SelfSignedCertificate;
public final class EchoServer {
static final boolean SSL = Property("ssl") != null;
static final int PORT = Integer.Property("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.ificate(), ssc.privateKey()).build();
} el {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler rverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.wHandler(ch.alloc()));
}
p.addLast(new EchoServerHandler2());
p.addLast(rverHandler);
}
});
ChannelFuture f = b.bind(PORT).sync();
f.channel().cloFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
EchoServerHandler
ho;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
channel.DefaultEventLoopGroup;
util.ChartUtil;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ho.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName());
// 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送的消息是:" + String(ChartUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
ctx.piedBuffer("hello, 客户端!0!", ChartUtil.UTF_8));
// ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
// ⽐如这⾥我们将⼀个特别耗时的任务转为异步执⾏(也就是任务提交到NioEventLoop的taskQueue中)
System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getName());
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.piedBuffer("hello, 客户端!1!", ChartUtil.UTF_8));
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
// Clo the connection when an exception is raid.
cau.printStackTrace();
ctx.clo();
}
}
EchoServerHandler2
ho;
channel.ChannelHandlerContext;
channel.ChannelOutboundHandlerAdapter;
channel.ChannelPromi;
channel.DefaultEventLoopGroup;
public class EchoServerHandler2 extends ChannelOutboundHandlerAdapter {
private static final DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(16);
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromi promi) throws Exception {
super.write(ctx, msg, promi);
System.out.println("ho.EchoServerHandler2.write called, threadName: " + Thread.currentThread().getName()); }
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
// Clo the connection when an exception is raid.
cau.printStackTrace();
ctx.clo();
}
}
2. client 代码
EchoClient
ho;
bootstrap.Bootstrap;
channel.*;
channel.nio.NioEventLoopGroup;
channel.socket.SocketChannel;
channel.socket.nio.NioSocketChannel;
handler.ssl.SslContext;
handler.ssl.SslContextBuilder;
handler.ssl.util.IncureTrustManagerFactory;
public final class EchoClient {
static final boolean SSL = Property("ssl") != null;
static final String HOST = Property("host", "127.0.0.1");
static final int PORT = Integer.Property("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
sslCtx = SslContextBuilder.forClient()
.trustManager(IncureTrustManagerFactory.INSTANCE).build();
} el {
sslCtx = null;
}
// Configure the client.
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap();
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.wHandler(ch.alloc(), HOST, PORT));
}
p.addLast(new EchoClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(HOST, PORT).sync();
// Wait until the connection is clod.
f.channel().cloFuture().sync();
} finally {
// Shut down the event loop to terminate all threads.
group.shutdownGracefully();
}
}
}
EchoClientHandler
ho;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
util.ChartUtil;
public class EchoClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("ClientHandler ctx: " + ctx);
ctx.piedBuffer("hello,服务器!", ChartUtil.UTF_8));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("服务器会送的消息是:" + String(ChartUtil.UTF_8));
System.out.println("服务器地址:" + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
cau.printStackTrace();
ctx.clo();
}
}
3. 测试
先启动服务端,然后启动客户端,然后查看服务端控制台如下:
客户端发送的消息是:hello,服务器!
客户端地址:/127.0.0.1:54247
4. 分析
可以看到上⾯的逻辑是:
(1) 当IO线程轮询到⼀个socket 事件后,IO线程开始处理,当⾛到EchoServerHandler ⽐较耗时的操作之后,将耗时任务交给线程池。
(2) 当耗时任务执⾏完毕再执⾏ctx.writeAndFlush 时,会将这个任务再交给IO线程,过程如下(也就是
最终的写操作都会交给IO线程):1》ioty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, ioty.channel.ChannelPromi)
private void write(Object msg, boolean flush, ChannelPromi promi) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = uch(msg, next);
EventExecutor executor = utor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promi);
} el {
next.invokeWrite(m, promi);
}
} el {
AbstractWriteTask task;
if (flush) {
task = wInstance(next, m, promi);
} el {
task = wInstance(next, m, promi);
}
safeExecute(executor, task, promi, m);
}
}
这⾥⾛的是el 代码块的代码,因为当前线程不属于IO线程⾥⾯,所以就⾛el。 el 代码块的逻辑是创建⼀个写Task,然后调⽤ioty.channel.AbstractChannelHandlerContext#safeExecute:
private static void safeExecute(EventExecutor executor, Runnable runnable, ChannelPromi promi, Object msg) {
try {
} catch (Throwable cau) {
try {
promi.tFailure(cau);
} finally {
if (msg != null) {
}
}
}
}
可以看到是调⽤ute ⽅法加⼊⾃⼰的任务队列⾥⾯。io.urrent.SingleThreadEventExecutor#execute public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} el {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
补充:Handler 中加异步还有⼀种⽅式就是创建⼀个任务,加⼊到⾃⼰的任务队列,这个实际也占⽤的是IO线程
ho;
buffer.ByteBuf;
buffer.Unpooled;
channel.ChannelHandlerContext;
channel.ChannelInboundHandlerAdapter;
util.ChartUtil;
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
System.out.println("ho.EchoServerHandler.channelRead thread: " + Thread.currentThread().getName());
// 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("客户端发送的消息是:" + String(ChartUtil.UTF_8));
System.out.println("客户端地址:" + ctx.channel().remoteAddress());
ctx.piedBuffer("hello, 客户端!0!", ChartUtil.UTF_8));
ctx.channel().eventLoop().execute(new Runnable() {
@Override
public void run() {
// ⽐如这⾥我们将⼀个特别耗时的任务转为异步执⾏(也就是任务提交到NioEventLoop的taskQueue中) System.out.println("java.lang.Runnable.run thread: " + Thread.currentThread().getNam
e());
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
ctx.piedBuffer("hello, 客户端!1!", ChartUtil.UTF_8));
}
});
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cau) {
// Clo the connection when an exception is raid.
cau.printStackTrace();
ctx.clo();
}
}
测试:可以看出异步也⽤的是当前的IO线程
客户端发送的消息是:hello,服务器!
客户端地址:/127.0.0.1:53721
java.lang.Runnable.run thread: nioEventLoopGroup-3-1
2. Context 中增加异步线程池
1. 代码改造
EchoServer 代码改造
ho;
bootstrap.ServerBootstrap;
channel.*;
channel.nio.NioEventLoopGroup;
channel.socket.SocketChannel;
channel.socket.nio.NioServerSocketChannel;
handler.logging.LogLevel;
handler.logging.LoggingHandler;
handler.ssl.SslContext;
handler.ssl.SslContextBuilder;
handler.ssl.util.SelfSignedCertificate;
public final class EchoServer {
static final boolean SSL = Property("ssl") != null;
static final int PORT = Integer.Property("port", "8007"));
public static void main(String[] args) throws Exception {
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.ificate(), ssc.privateKey()).build();
} el {
sslCtx = null;
}
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
DefaultEventLoopGroup group = new DefaultEventLoopGroup(16);
final EchoServerHandler rverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {