多线程reactor模式旨在分配多个reactor每一个reactor独立拥有一个lector,在网络通信中大体设计为负责连接的主reactor,其中在主reactor的run函数中若lector检测到了连接事件的发生则dispatch该事件。
让负责管理连接的handler处理连接,其中在这个负责连接的handler处理器中创建子handler用以处理io请求。这样一来连接请求与io请求分开执行提高通道的并发量。同时多个reactor带来的好处是多个lector可以提高通道的检索速度
package com.crazymakercircle.reactormodel;import com.crazymakercircle.niodemoconfig;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.net.inetsocketaddress;import java.nio.channels.lectionke河南检查职业学院y;import java.nio.channels.lector;import java.nio.channels.rversocketchannel;import java.nio.channels.socketchannel;import java.util.iterator;import java.util.t;import java.util.concurrent.atomic.atomicinteger;class multithreadechorverreactor { rversocketchannel rversocket; atomicinteger next = new atomicinteger(0); lector bosslector = null; reactor bossreactor = null; //lectors集合,引入多个lector选择器 //多个选择器可以更好的提高通道的并发量 lector[] worklectors = new lector[2]; //引入多个子反应器 //如果cpu是多核的可以开启多个子reactor反应器,这样每一个子reactor反应器还可以独立分配一个线程。 //每一个线程可以单独绑定一个单独的lector选择器以提高通道并发量 reactor[] workreactors = null; multithreadechorverreactor() throws ioexception { bosslector = lector.open(); //初始化多个lector选择器 worklectors[0] = lector.open(); worklectors[1] = lector.open(); rversocket = rversocketchannel.open(); inetsocketaddress address = new inetsocketaddress(niodemoconfig.socket_rver_ip, niodemoconfig.socket_rver_port); rversocket.socket().bind(address); //非阻塞 rversocket.configureblocking(fal); //第一个lector,负责监控新连接事件 lectionkey sk = rversocket.register(bosslector, lectionkey.op_accept); //附加新连接处理handler处理器到lectionkey(选择键) sk.attach(new acceptorhandler()); //处理新连接的反应器 bossreactor = new reactor(bosslector); //第一个子反应器,一子反应器负责一个选择器 reactor subreactor1 = new reactor(worklectors[0]); //第二个子反应器,一子反应器负责一个赏析无可奈何花落去选择器 reactor subreactor2 = new reactor(worklectors[1]); workreactors = new reactor[]{subreactor1, subreactor2}; } private void startrvice() { new thread(bossreactor).start(); // 一子反应器对应一条线程 new thread(workreactors[0]).start(); new thread(workreactors[1]).start(); } //反应器 class reactor implements runnable { //每条线程负责一个选择器的查询 final lector lector; public reactor(lector lector) { this.lector = lector; } public void run() { try { while (!thread.interrupted()) { //单位为毫秒 //每隔一秒列出选择器感应列表 lector.lect(1000); t<lectionkey> lectedkeys = lector.lectedkeys(); if (null == lectedkeys || lectedkeys.size() == 0) { //如果列表中的通道注册事件没有发生那就继续执行 continue; } iterator<lectionkey> it = lectedkeys.iterator(); while (it.hasnext()) { //reactor负责dispatch收到的事件 lectionkey sk = it.next(); dispatch(sk); } //清楚掉已经处理过的感应事件,防止重复处理 lectedkeys.clear(); } } catch (ioexception ex) { ex.printstacktrace(); } } void dispatch(lectionkey sk) { runnable handler = (runnable) sk.attachment(); //调用之前attach绑定到选择键的handler处理器对象 if (handler != null) { handler.run(); } } } // handler:新连接处理器 class acceptorhandler implements runnable { public void run() { try { socketchannel channel = rversocket.accept(); logger.info("接收到一个新的连接"); if (channel != null) { int index = next.get(); logger.info("选择器的编号:" + index); lector lector = worklecto个人转租房rs[index]; new multithreadechohandler(lector, channel); } } catch (ioexception e) { e.printstacktrace(); } if (next.incrementandget() == worklectors.length) { next.t(0); } } } public static void main(string[] args) throws ioexception { multithreadechorverreactor rver = new multithreadechorverreactor(); rver.startrvice(); }}
按上述的设计思想,在主服务器中实际上设计了三个reactor,一个主reactor专门负责连接请求并配已单独的lector,但是三个reactor的线程run函数是做的相同的功能,都是根据每个线程内部的lector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个reactor对应的handler。
这里需要注意的一开始其实只有负责连接事件的主reactor在注册lector的时候给相应的key配了一个acceptorhandler()。
//第一个lector,负责监控新连接事件 lectionkey sk = rversocket.register(bosslector, lectionkey.op_accept); //附加新连接处理handler处理器到lectionkey(选择键) sk.attach(new acceptorhandler());
但是reactor的run方法里若相应的lector key发生了便要dispatch到一个handler。这里其他两个子reactor的handler在哪里赋值的呢?其实在处理连接请求的reactor中便创建了各个子handler,如下代码所示:
主handler中先是根据服务器channel创建出客服端channel,在进行子lector与channel的绑定。
int index = next.get(); logger.info("选择器的编号:" + index); lector lector = worklectors[index]; new multithreadechohandler(lector, channel);
package com.crazymakercircle.reactormodel;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.nio.bytebuffer;import java.nio.channels.lectionkey;import java.nio.channels.lector;import java.nio.channels.socketchannel;import java.util.concurrent.executorrvice;import java.util.concurrent.executors;class multithreadechohandler implements runnable { final socketchannel channel; final lectionkey sk; final bytebuffer bytebuffer = bytebuffer.allocate(1024); static final int recieving = 0, nding = 1; int state = recieving; //引入线程池 static executorrvice pool = executors.newfixedthreadpool(4); multithreadechohandler(lector lector, socketchannel c) throws ioexception { channel = c; channel.configureblocking(fal); //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss lector.wakeup(); //仅仅取得选择键,后设置感兴趣的io事件 sk = channel.register(lector, 0); //将本handler作为sk选择键的附件,方便事件dispatch sk.attach(this); //向sk选择键注册read就绪事件 sk.interestops(lectionkey.op_read); //唤醒选择,是的op_read生效 lector.wakeup(); logger.info("新的连接 注册完成"); } public void run() { //异步任务,在独立的线程池中执行 pool.execute(new asynctask()); } //异步任务,不在reactor线程中执行 public synchronized void asyncrun() { try { if (state == nding) { //写入通道 channel.write(bytebuffer); //写完后,准备开始从通道读,bytebuffer切换成写模式 bytebuffer.clear(); //写完后,注册read就绪事件 sk.interestops(lectionkey.op_read); //写完后,进入接收的状态 state = recieving; } el if (state == recieving) { //从通道读 int length = 0; while ((length = channel.read(bytebuffer)) > 0) { logger.info(new string(bytebuffer.array(), 0, length)); } //读完后,准备开始写入通道,bytebuffer切换成读模式 bytebuffer.flip(); //读完后,注册write就绪事件 sk.interestops(lectionkey.op_write); //读完后,进入发送的状态 state = nding; } //处理结束了, 这里不能关闭lect key,需要重复使用 //sk.cancel(); } catch (ioexception ex) { ex.printstacktrace(); } } //异步任务的内部类 class asynctask implements runnable { public void run() { multithreadechohandler.this.asyncrun(); } }}
在处理io请求的handler中采用了线程池,已达到异步处理的目的。
package com.crazymakercircle.reactormodel;import com.crazymakercircle.niodemoconfig;import com.crazymakercircle.util.dateutil;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.net.inetsocketaddress;import java.nio.bytebuffer;import java.nio.channels.lectionkey;import java.nio.channels.lector;import java.nio.channels.socketchannel;import java.util.iterator;import java.util.scanner;import java.util.t;/** * create by 尼恩 @ 疯狂创客圈 **/public class echoclient { public void start() throws ioexception { inetsocketaddress address = new inetsocketaddress(niodemoconfig.socket_rver_ip, niodemoconfig.socket_rver_port); // 1、获取通道(channel) socketchannel socketchannel = socketchannel.open(address); logger.info("客户端连接成功"); // 2、切换成非阻塞模式 socketchannel.configureblocking(fal); //不断的自旋、等待连接完成,或者做一些其他的事情 while (!socketchannel.finishconnect()) { } logger.tcfo("客户端启动成功!"); //启动接受线程 procesr procesr = new procesr(socketchannel); new thread(procesr).start(); } static class procesr implements runnable { final lector lector; final socketchannel channel; procesr(socketchannel channel) throws ioexception { //reactor初始化 lector = lector.open(); this.channel = channel; channel.register(lector, lectionkey.op_read | lectionkey.op_write); } public void run() { try { while (!thread.interrupted()) { lector.lect(); t<lectionkey> lected = lector.lectedkeys(); iterator<lectionkey> it = lected.iterator(); while (it.hasnext()) { lectionkey sk = it.next(); if (sk.iswritable()) { bytebuffer buffer = bytebuffer.allocate(niodemoconfig.nd_buffer_size); scanner scanner = new scanner(system.in); logger.tcfo("请输入发送内容:"); if (scanner.hasnext()) { socketchannel socketchannel = (socketchannel) sk.channel(); string next = scanner.next(); buffer.put((dateutil.getnow() + " >>" + next).getbytes()); buffer.flip(); // 操作三:发送数据 socketchannel.write(buffer); buffer.clear(); } } if (sk.isreadable()) { // 若选择键的io事件是“可读”事件,读取数据 socketchannel socketchannel = (socketchannel) sk.channel(); //读取数据 bytebuffer bytebuffer = bytebuffer.allocate(1024); int length = 0; while ((length = socketchannel.read(bytebuffer)) > 0) { bytebuffer.flip(); logger.info("rver echo:" + new string(bytebuffer.array(), 0, length)); bytebuffer.clear(); } 安徽大专 } //处理结束了, 这里不能关闭lect key,需要重复使用 //lectionkey.cancel(); } lected.clear(); } } catch (ioexception ex) { ex.printstacktrace(); } } } public static void main(string[] args) throws ioexception { new echoclient().start(); }}
到此这篇关于java中多线程reactor模式的实现的文章就介绍到这了,更多相关java 多线程reactor内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!
本文发布于:2023-04-04 00:15:47,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/a2385c6b0c8260b69f25d0e68008f6c6.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:Java中多线程Reactor模式的实现.doc
本文 PDF 下载地址:Java中多线程Reactor模式的实现.pdf
留言与评论(共有 0 条评论) |