首页 > 作文

基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送

更新时间:2023-04-08 12:12:15 阅读: 评论:0

思路

利用 websocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。

websocket 服务

compor require hyperf/websocket-rver

  

配置文件 [config/autoload/rver.php]

<?phpreturn [    'mode' => swoole_process,    'rvers' => [        [            'name' => 'http',            'type' => rver::rver_http,            'host' => '0.0.0.0',            'port' => 11111,            'sock_type' => swoole_sock_tcp,            'callbacks' => [                swooleevent::on_request => [hyperf\httprver\rver::class, 'onrequest'],            ],        ],        [            'name' => 'ws',            'type' => rver::rver_websocket,            'host' => '0.0.0.0',            'port' => 12222,            'sock_type' => swoole_sock_tcp,            'callbacks' => [                swooleevent::on_hand_shake => [hyperf\websocketrver\rver::class, 'onhandshake'],                swooleevent::on_message => [hyperf\websocketrver\rver::class, 'onmessage'],                swooleevent::on_clo => [hyperf\websocketrver\rver::class, 'onclo'],            ],        ],    ],

  

websocket 服务器端代码示例

<?phpdeclare(strict_types=1);/** * this file is part of hyperf. * * @link     /d/file/titlepic/www.hyperf.io * @document https://doc.hyperf.io * @contact  group@hyperf.io * @licen  https://github.com/hyperf-cloud/hyperf/blob/master/licen */namespace app\controller;u hyperf\contract\onclointerface;u hyperf\contract\onmessageinterface;u hyperf\contract\onopeninterface;u swoole\http\request;u swoole\rver;u swoole\websocket\frame;u swoole\websocket\rver as websocketrver;class websocketcontroller extends controller implements onmessageinterface, onopeninterface, onclointerface{    /**     * 发送消息     * @param websocketrver $rver     * @param frame $frame     */    public function onmessage(websocketrver $rver, frame $frame): void    {        //心跳刷新缓存        $redis = $this->container->get(\redis::class);        //获取所有的客户端id        $fdlist = $redis->smembers('websocket_sjd_1');        //如果当前客户端在客户端集合中,就刷新    怎样当演员    if (in_array($frame->fd, $fdlist)) {            $redis->sadd('websocket_sjd_1', $frame->fd);            $redis->expire('websocket_sjd_1', 7200);        }        $rver->push($frame->fd, 'recv: ' . $frame->data);    }    /**     * 客户端失去链接     * @param rver hit过去式$rver     * @param int $fd     * @param int $reactorid     */    public function onclo(rver $rver, int $fd, int $reactorid): void    {        //删掉客户端id        $redis = $this->container->get(\redis::class);        //移除集合中指定的value        $redis->srem('websocket_sjd_1', $fd);        var_dump('clod');    }    /**     * 客户端链接     * @param websocketrver $rver     * @param request $request     */    public function onopen(websocketrver $rver, request $request): void    {        //保存客户端id        $redis = $this->container->get(\redis::class);        $res1 = $redis->sadd('websocket_sjd_1', $request->fd);        var_dump($res1);        $res = $redis->expire('websocket_sjd_1', 7200);        var_dump($res);        $rver->push($request->fd, 'opened');    }}

  

websocket 前端代码

function websockettest() { if ("websocket" in window) {            console.log("您的浏览器支持 websocket!");            var num = 0            // 打开一个 web socket            var ws = new websocket("ws://127.0.0.1:12222");            ws.onopen = function () {                // web socket 已连接上,使用 nd() 方法发送数据                //alert("数据发送中...");                //ws.nd("发送数据");            };            window.tinterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开                var ping = {"type": "ping"};                ws.nd(json.stringify(ping));            }, 5000);            ws.onmessage = function (evt) {                var d = json.par(evt.data);                console.log(d);                if (d.code == 300) {                    $(".address").text(d.address)                }                if (d.code == 200) {                    var v = d.data                    console.log(v);                    num++                    var str = `<div class="item">                                    <p>${v.recordouttime}</p>                                    <p>${v.uroutname}</p>                                    <p>${v.uroutnum}</p>                                    <p>${v.dooroutname}</p>                                </div>`                    $(".tablehead").after(str)                    if (num > 7) {                        num--                        $(".table .item:nth-last-child(1)").remove()                    }                }            };            ws.error = function (e) {                console.log(e)                alert(e)            }            ws.onclo = function () {                // 关闭 websocket                alert("连接已关闭...");            };        } el {            alert("您的浏览器不支持 websocket!");        }    }

  

amqp 组件

compor require hyperf/amqp

配置文件 [config/autoload/amqp.php]

<?phpreturn [    'default' => [        'host' => 'localhost',        'port' => 5672,        'ur' => 'guest',        'password' => 'guest',        'vhost' => '/',        'pool' => [            'min_connections' => 1,            'max_connections' => 10,            'connect_timeout' => 10.0,            'wait_timeout' => 3.0,            'heartbeat' => -1,        ],        'params' => [            'insist' => fal,            'login_method' => 'amqplain',            'login_respon' => null,            'locale' => 'en_us',            'connection_timeout' => 3.0,            'read_write_timeout' => 6.0,            'context' => null,            'keepalive' => fal,            'heartbeat' => 3,        ],    ],];

  

mq 消费者代码

<?phpdeclare(strict_types=1);namespace app\amqp\consumer;u hyperf\amqp\annotation\consumer;u hyperf\amqp\message\consumermessage;u hyperf\amqp\result;u hyperf\rver\rver;u hyperf\rver\rverfactory;/** * @consumer(exchange="hyperf", routingkey="hyperf", queue="hyperf", nums=1) */class democonsumer extends consumermessage{    /**     * rabbm顺流而下的意思itmq消费端代码     * @param $data     * @return string     */    public function consume($data): string    {        print_r($data);        //获取集合中所有的value        $redis = $this->container->get(\redis::class);        $fdlist=$redis->smembers('w文科生有哪些专业可选ebsocket_sjd_1');        $rver=$this->container->get(rverfactory::class)->getrver()->getrver();        foreach($fdlist as $key=>$v){            if(!empty($v)){                $rver->push((int)$v, $data);            }        }        return result::ack;    }

  

} 

控制器代码

/**    * test     * @return array     */    public function test()    {        $data = array(            'code' => 200,            'data' => [                'uroutname' => 'ccflow',                'uroutnum' => '9999',                'recordouttime' => date("y-m-d h:i:s", time()),                'dooroutname' => '教师公寓',            ]        );        $data = \guzzlehttp\json_encode($data);        $message = new demoproducer($data);        $producer = applicationcontext::getcontaine光子脱毛激光脱毛r()->get(producer::class);        $result = $producer->produce($message);        var_dump($result);        $ur = $this->request->input('ur', 'hyperf');        $method = $this->request->getmethod();        return [            'method' => $method,            'message' => "{$ur}.",        ];    }

  

最终效果

本文发布于:2023-04-08 12:11:03,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/zuowen/00debd21e99601326fe1e113ec1fd63e.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

本文word下载地址:基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送.doc

本文 PDF 下载地址:基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送.pdf

上一篇:宫缩多久会生
下一篇:返回列表
标签:客户端   代码   您的   链接
相关文章
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图