首页 > 作文

Java中多线程Reactor模式的实现

更新时间:2023-04-04 00:15:48 阅读: 评论:0

目录
1、 主服务器让世界充满爱演讲下载2、io请求handler+线程池3、客户端

多线程reactor模式旨在分配多个reactor每一个reactor独立拥有一个lector,在网络通信中大体设计为负责连接的主reactor,其中在主reactor的run函数中若lector检测到了连接事件的发生则dispatch该事件。

让负责管理连接的handler处理连接,其中在这个负责连接的handler处理器中创建子handler用以处理io请求。这样一来连接请求与io请求分开执行提高通道的并发量。同时多个reactor带来的好处是多个lector可以提高通道的检索速度

1、 主服务器

package com.crazymakercircle.reactormodel;import com.crazymakercircle.niodemoconfig;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.net.inetsocketaddress;import java.nio.channels.lectionke河南检查职业学院y;import java.nio.channels.lector;import java.nio.channels.rversocketchannel;import java.nio.channels.socketchannel;import java.util.iterator;import java.util.t;import java.util.concurrent.atomic.atomicinteger;class multithreadechorverreactor {    rversocketchannel rversocket;    atomicinteger next = new atomicinteger(0);    lector bosslector = null;    reactor bossreactor = null;    //lectors集合,引入多个lector选择器    //多个选择器可以更好的提高通道的并发量    lector[] worklectors = new lector[2];    //引入多个子反应器    //如果cpu是多核的可以开启多个子reactor反应器,这样每一个子reactor反应器还可以独立分配一个线程。    //每一个线程可以单独绑定一个单独的lector选择器以提高通道并发量    reactor[] workreactors = null;    multithreadechorverreactor() throws ioexception {        bosslector = lector.open();        //初始化多个lector选择器        worklectors[0] = lector.open();        worklectors[1] = lector.open();        rversocket = rversocketchannel.open();        inetsocketaddress address =                new inetsocketaddress(niodemoconfig.socket_rver_ip,                        niodemoconfig.socket_rver_port);        rversocket.socket().bind(address);        //非阻塞        rversocket.configureblocking(fal);        //第一个lector,负责监控新连接事件        lectionkey sk =                rversocket.register(bosslector, lectionkey.op_accept);        //附加新连接处理handler处理器到lectionkey(选择键)        sk.attach(new acceptorhandler());        //处理新连接的反应器        bossreactor = new reactor(bosslector);        //第一个子反应器,一子反应器负责一个选择器        reactor subreactor1 = new reactor(worklectors[0]);        //第二个子反应器,一子反应器负责一个赏析无可奈何花落去选择器        reactor subreactor2 = new reactor(worklectors[1]);        workreactors = new reactor[]{subreactor1, subreactor2};    }    private void startrvice() {        new thread(bossreactor).start();        // 一子反应器对应一条线程        new thread(workreactors[0]).start();        new thread(workreactors[1]).start();    }    //反应器    class reactor implements runnable {        //每条线程负责一个选择器的查询        final lector lector;        public reactor(lector lector) {            this.lector = lector;        }        public void run() {            try {                while (!thread.interrupted()) {                    //单位为毫秒                    //每隔一秒列出选择器感应列表                    lector.lect(1000);                    t<lectionkey> lectedkeys = lector.lectedkeys();                    if (null == lectedkeys || lectedkeys.size() == 0) {                        //如果列表中的通道注册事件没有发生那就继续执行                        continue;                    }                    iterator<lectionkey> it = lectedkeys.iterator();                    while (it.hasnext()) {                        //reactor负责dispatch收到的事件                        lectionkey sk = it.next();                        dispatch(sk);                    }                    //清楚掉已经处理过的感应事件,防止重复处理                    lectedkeys.clear();                }            } catch (ioexception ex) {                ex.printstacktrace();            }        }        void dispatch(lectionkey sk) {            runnable handler = (runnable) sk.attachment();            //调用之前attach绑定到选择键的handler处理器对象            if (handler != null) {                handler.run();            }        }    }    // handler:新连接处理器    class acceptorhandler implements runnable {        public void run() {        try {                socketchannel channel = rversocket.accept();                logger.info("接收到一个新的连接");                if (channel != null) {                    int index = next.get();                    logger.info("选择器的编号:" + index);                    lector lector = worklecto个人转租房rs[index];                    new multithreadechohandler(lector, channel);                }            } catch (ioexception e) {                e.printstacktrace();            }            if (next.incrementandget() == worklectors.length) {                next.t(0);            }        }    }    public static void main(string[] args) throws ioexception {        multithreadechorverreactor rver =                new multithreadechorverreactor();        rver.startrvice();    }}

按上述的设计思想,在主服务器中实际上设计了三个reactor,一个主reactor专门负责连接请求并配已单独的lector,但是三个reactor的线程run函数是做的相同的功能,都是根据每个线程内部的lector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个reactor对应的handler。

这里需要注意的一开始其实只有负责连接事件的主reactor在注册lector的时候给相应的key配了一个acceptorhandler()。

 //第一个lector,负责监控新连接事件        lectionkey sk =                rversocket.register(bosslector, lectionkey.op_accept);        //附加新连接处理handler处理器到lectionkey(选择键)        sk.attach(new acceptorhandler());

但是reactor的run方法里若相应的lector key发生了便要dispatch到一个handler。这里其他两个子reactor的handler在哪里赋值的呢?其实在处理连接请求的reactor中便创建了各个子handler,如下代码所示:
主handler中先是根据服务器channel创建出客服端channel,在进行子lector与channel的绑定。

                   int index = next.get();                   logger.info("选择器的编号:" + index);                   lector lector = worklectors[index];                   new multithreadechohandler(lector, channel);

2、io请求handler+线程池

package com.crazymakercircle.reactormodel;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.nio.bytebuffer;import java.nio.channels.lectionkey;import java.nio.channels.lector;import java.nio.channels.socketchannel;import java.util.concurrent.executorrvice;import java.util.concurrent.executors;class multithreadechohandler implements runnable {    final socketchannel channel;    final lectionkey sk;    final bytebuffer bytebuffer = bytebuffer.allocate(1024);    static final int recieving = 0, nding = 1;    int state = recieving;    //引入线程池    static executorrvice pool = executors.newfixedthreadpool(4);    multithreadechohandler(lector lector, socketchannel c) throws ioexception {        channel = c;        channel.configureblocking(fal);        //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss        lector.wakeup();        //仅仅取得选择键,后设置感兴趣的io事件        sk = channel.register(lector, 0);        //将本handler作为sk选择键的附件,方便事件dispatch        sk.attach(this);        //向sk选择键注册read就绪事件        sk.interestops(lectionkey.op_read);        //唤醒选择,是的op_read生效        lector.wakeup();        logger.info("新的连接 注册完成");    }    public void run() {        //异步任务,在独立的线程池中执行        pool.execute(new asynctask());    }    //异步任务,不在reactor线程中执行    public synchronized void asyncrun() {        try {            if (state == nding) {                //写入通道                channel.write(bytebuffer);                //写完后,准备开始从通道读,bytebuffer切换成写模式                bytebuffer.clear();                //写完后,注册read就绪事件                sk.interestops(lectionkey.op_read);                //写完后,进入接收的状态                state = recieving;            } el if (state == recieving) {                //从通道读                int length = 0;                while ((length = channel.read(bytebuffer)) > 0) {                    logger.info(new string(bytebuffer.array(), 0, length));                }                //读完后,准备开始写入通道,bytebuffer切换成读模式                bytebuffer.flip();                //读完后,注册write就绪事件                sk.interestops(lectionkey.op_write);                //读完后,进入发送的状态                state = nding;            }            //处理结束了, 这里不能关闭lect key,需要重复使用            //sk.cancel();        } catch (ioexception ex) {            ex.printstacktrace();        }    }    //异步任务的内部类    class asynctask implements runnable {        public void run() {            multithreadechohandler.this.asyncrun();        }    }}

3、客户端

在处理io请求的handler中采用了线程池,已达到异步处理的目的。

package com.crazymakercircle.reactormodel;import com.crazymakercircle.niodemoconfig;import com.crazymakercircle.util.dateutil;import com.crazymakercircle.util.logger;import java.io.ioexception;import java.net.inetsocketaddress;import java.nio.bytebuffer;import java.nio.channels.lectionkey;import java.nio.channels.lector;import java.nio.channels.socketchannel;import java.util.iterator;import java.util.scanner;import java.util.t;/** * create by 尼恩 @ 疯狂创客圈 **/public class echoclient {    public void start() throws ioexception {        inetsocketaddress address =                new inetsocketaddress(niodemoconfig.socket_rver_ip,                        niodemoconfig.socket_rver_port);        // 1、获取通道(channel)        socketchannel socketchannel = socketchannel.open(address);        logger.info("客户端连接成功");        // 2、切换成非阻塞模式        socketchannel.configureblocking(fal);        //不断的自旋、等待连接完成,或者做一些其他的事情        while (!socketchannel.finishconnect()) {        }        logger.tcfo("客户端启动成功!");        //启动接受线程        procesr procesr = new procesr(socketchannel);        new thread(procesr).start();    }    static class procesr implements runnable {        final lector lector;        final socketchannel channel;        procesr(socketchannel channel) throws ioexception {            //reactor初始化            lector = lector.open();            this.channel = channel;            channel.register(lector,                    lectionkey.op_read | lectionkey.op_write);        }        public void run() {            try {                while (!thread.interrupted()) {                    lector.lect();                    t<lectionkey> lected = lector.lectedkeys();                    iterator<lectionkey> it = lected.iterator();                    while (it.hasnext()) {                        lectionkey sk = it.next();                        if (sk.iswritable()) {                            bytebuffer buffer = bytebuffer.allocate(niodemoconfig.nd_buffer_size);                            scanner scanner = new scanner(system.in);                            logger.tcfo("请输入发送内容:");                            if (scanner.hasnext()) {                                socketchannel socketchannel = (socketchannel) sk.channel();                                string next = scanner.next();                                buffer.put((dateutil.getnow() + " >>" + next).getbytes());                                buffer.flip();                                // 操作三:发送数据                                socketchannel.write(buffer);                                buffer.clear();                            }                        }                        if (sk.isreadable()) {                            // 若选择键的io事件是“可读”事件,读取数据                            socketchannel socketchannel = (socketchannel) sk.channel();                            //读取数据                            bytebuffer bytebuffer = bytebuffer.allocate(1024);                            int length = 0;                            while ((length = socketchannel.read(bytebuffer)) > 0) {                                bytebuffer.flip();                                logger.info("rver echo:" + new string(bytebuffer.array(), 0, length));                                bytebuffer.clear();                            }    安徽大专                    }                        //处理结束了, 这里不能关闭lect key,需要重复使用                        //lectionkey.cancel();                    }                    lected.clear();                }            } catch (ioexception ex) {                ex.printstacktrace();            }        }    }    public static void main(string[] args) throws ioexception {        new echoclient().start();    }}

到此这篇关于java中多线程reactor模式的实现的文章就介绍到这了,更多相关java 多线程reactor内容请搜索www.887551.com以前的文章或继续浏览下面的相关文章希望大家以后多多支持www.887551.com!

本文发布于:2023-04-04 00:15:47,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/a2385c6b0c8260b69f25d0e68008f6c6.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:Java中多线程Reactor模式的实现.doc

本文 PDF 下载地址:Java中多线程Reactor模式的实现.pdf

标签:线程   反应器   事件   多个
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图