Dubbo线程模型和调度策略
⼀、服务调⽤
⾸先服务消费者通过代理对象 Proxy 发起远程调⽤,接着通过⽹络客户端 Client 将编码后的请求发送给服务提供⽅的⽹络层上,也就是Server。Server 在收到请求后,⾸先要做的事情是对数据包进⾏解码。然后将解码后的请求发送⾄分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调⽤具体的服务。这就是⼀个远程调⽤请求的发送与接收过程。
那么在dubbo中请求是如何派发的?以及线程模型是什么样的那?
⼆、I/O线程和业务线程分离
如果事件处理的逻辑能迅速完成,并且不会发起新的 IO请求,⽐如只是在内存中记个标识,则直接在 IO线程上处理更快,因为减少了线程池调度。
但如果事件处理逻辑较慢,或者需要发起新的 IO 请求,⽐如需要查询数据库,则必须派发到线程池,否则 IO 线程阻塞,将导致不能接收其它请求。
如果⽤ IO 线程处理事件,⼜在事件处理过程中发起新的 IO 请求,⽐如在连接事件中发起登录请求,会报“可能引发死锁”异常,但不会真死锁。
所以在真实的业务场景中是需要将业务线程和I/O线程进⾏分离处理的。dubbo作为⼀个服务治理框架,底层的采⽤Netty作为⽹络通信的组件,在请求派发的时候⽀持不同的派发策略。
参考⽂章:
三、请求派发策略
连接建⽴
从官⽅描述来看,duboo⽀持五种派发策略,下⾯看下是如何实现的。以Ntty4.x为例:
1. NettyServer
public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
super(url, ChannelHandlers.wrap(handler, ExecutorUtil.tThreadName(url, SERVER_THREAD_POOL_NAME)));
}
复制代码
2. ChannelHandlers#wrapInternal
protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
// 选择调度策略默认是all
return new MultiMessageHandler(new ExtensionLoader(Dispatcher.class)
.getAdaptiveExtension().dispatch(handler, url)));
}
复制代码
在NettyServer的构造⽅法中通过ChannelHandlers#wrap⽅法设置MultiMessageHandler,HeartbeatHandler并通过SPI扩展选择调度策略。
3. NettyServer#doOpen
protected void doOpen() throws Throwable {
bootstrap = new ServerBootstrap();
// 多线程模型
// boss线程池,负责和消费者建⽴新的连接
bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));
// worker线程池,负责连接的数据交换
workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
new DefaultThreadFactory("NettyServerWorker", true));
final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
channels = Channels();
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE) // nagele 算法
.childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)// TIME_WAIT
.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) //内存池
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
.addLast("decoder", Decoder()) //设置编解码器
.addLast("encoder", Encoder())
.
addLast("handler", nettyServerHandler);
}
});
// bind 端⼝
ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
channelFuture.syncUninterruptibly();
channel = channelFuture.channel();
}
复制代码
设置Netty的boss线程池数量为1,worker线程池(也就是I/O线程)为cpu核⼼数+1和向Netty中注测Handler⽤于消息的编解码和处理。
如果我们在⼀个JVM进程只暴露⼀个Dubbo服务端⼝,那么⼀个JVM进程只会有⼀个NettyServer实例,也会只有⼀个NettyHandler实例。并且设置了三个handler,⽤来处理编解码、连接的创建、消息读写等。在dubbo内部定义了⼀个ChannelHandler⽤来和Netty
的Channel关联起来,通过上述的代码会发现NettyServer本⾝也是⼀个ChannelHandler。通过NettyServer#doOpen暴露服务端⼝后,客户端就能和服务端建⽴连接了,⽽提供者在初始化连接后会调⽤NettyHandler#channelActive⽅法来创建⼀个NettyChannel
4. NettyChannel
public void channelActive(ChannelHandlerContext ctx) throws Exception {
logger.debug("channelActive <" + AddressString((InetSocketAddress) ctx.channel().remoteAddress()) + ">" + " channle <" + ctx.channel()); //获取或者创建⼀个NettyChannel
NettyChannel channel = OrAddChannel(ctx.channel(), url, handler);
try {
if (channel != null) {
// <ip:port, channel>
channels.AddressString((InetSocketAddress) ctx.channel().remoteAddress()), channel);
}
// 这⾥的 handler就是NettyServer
} finally {
}
}
复制代码
与Netty和Dubbo都有⾃⼰的ChannelHandler⼀样,Netty和Dubbo也有着⾃⼰的Channel。该⽅法最后会调⽤NettyServer#connected⽅
法来检查新添加channel后是否会超出提供者配置的accepts配置,如果超出,则直接打印错误⽇志并关闭该Channel,这样的话消费者端
⾃然会收到连接中断的异常信息,详细可以见AbstractServer#connected⽅法。
5. AbstractServer#connected
public void connected(Channel ch) throws RemotingException {
// If the rver has entered the shutdown process, reject any new connection
if (this.isClosing() || this.isClod()) {
logger.warn("Clo new channel " + ch + ", cau: rver is closing or has been clod. For example, receive a new connect request while in shutdown proc ch.clo();
return;
}
Collection<Channel> channels = getChannels();
//⼤于accepts的tcp连接直接关闭
if (accepts > 0 && channels.size() > accepts) {
<("Clo channel " + ch + ", cau: The rver " + ch.getLocalAddress() + " connections greater than max config " + accepts);
ch.clo();
return;
}
}
复制代码
在dubbo中消费者和提供者默认只建⽴⼀个TCP长连接(详细代码请参考官⽹源码导读,服务引⽤⼀节),为了增加消费者调⽤服务提供者的吞吐量,可以在消费⽅增加如下配置:
<dubbo:reference id="demoService" check="fal" interface="org.apache.dubbo.demo.DemoService" connections="20"/>
复制代码
提供者可以使⽤accepts控制长连接的数量防⽌连接数量过多,配置如下:
<dubbo:protocol name="dubbo" port="20880" accepts="10"/>
复制代码
请求接收
当连接建⽴完成后,消费者就可以请求提供者的服务了,当请求到来,提供者这边会依次经过如下Handler的处理:
--->NettyServerHandler#channelRead:接收请求消息。
-
-->AbstractPeer#received:如果服务已经关闭,则返回,否则调⽤下⼀个Handler来处理。
--->MultiMessageHandler#received:如果是批量请求,则依次对请求调⽤下⼀个Handler来处理。
--->HeartbeatHandler#received: 处理⼼跳消息。
--->AllChannelHandler#received:该Dubbo的Handler⾮常重要,因为从这⾥是IO线程池和业务线程池的隔离。
--->DecodeHandler#received: 消息解码。
--->HeaderExchangeHandler#received:消息处理。
--->DubboProtocol : 调⽤服务。
1. AllChannelHandler#received:
public void received(Channel channel, Object message) throws RemotingException {
// 获取业务线程池
ExecutorService cexecutor = getExecutorService();
try {
// 使⽤线程池处理消息
} catch (Throwable t) {
throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
}
}
复制代码
这⾥对execute进⾏了异常捕获,这是因为I/O线程池是⽆界的,但业务线程池可能是有界的,所以进⾏execute提交可能会遇到RejectedExecutionException异常 。
那么这⾥是如何获取到业务线程池的那?实际上WrappedChannelHandler是xxxChannelHandlerd的装饰类,根据dubbo spi可以知道,获取AllChannelHandler会⾸先实例化WrappedChannelHandler。
2. WrappedChannelHandler
public WrappedChannelHandler(ChannelHandler handler, URL url) {
this.handler = handler;
this.url = url;
// 获取业务线程池
executor = (ExecutorService) ExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);
String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
if (Constants.CONSUMER_SIDE.Parameter(Constants.SIDE_KEY))) {
componentKey = Constants.CONSUMER_SIDE;
}
DataStore dataStore = ExtensionLoader(DataStore.class).getDefaultExtension();
dataStore.put(componentKey, Port()), executor);
}
复制代码
线程模型
1. FixedThreadPool
public class FixedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
// 线程池名称DubboServerHanler-rver:port
String name = Parameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 缺省线程数量200
int threads = Parameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
// 任务队列类型
int queues = Parameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
复制代码
缺省情况下使⽤200个线程和SynchronousQueue这意味着如果如果线程池所有线程都在⼯作再有新任务会直接拒绝。
2. CachedThreadPool
public class CachedThreadPool implements ThreadPool {
@Override
public Executor getExecutor(URL url) {
String name = Parameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
// 核⼼线程数量缺省为0
int cores = Parameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THR
EADS);
// 最⼤线程数量缺省为Integer.MAX_VALUE
int threads = Parameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
// queue 缺省为0
int queues = Parameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
// 空闲线程存活时间
int alive = Parameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
queues == 0 ? new SynchronousQueue<Runnable>() :
(queues < 0 ? new LinkedBlockingQueue<Runnable>()
: new LinkedBlockingQueue<Runnable>(queues)),
new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url));
}
}
复制代码
缓存线程池,可以看出如果提交任务的速度⼤于maxThreads将会不断创建线程,极端条件下将会耗尽CPU和内存资源。在突发⼤流量进⼊时不适合使⽤。
3. LimitedThreadPool