利用 websocket 协议让客户端和服务器端保持有状态的长链接,保存链接上来的客户端 id。订阅发布者发布的消息针对已保存的客户端 id 进行广播消息。
compor require hyperf/websocket-rver
<?phpreturn [ 'mode' => swoole_process, 'rvers' => [ [ 'name' => 'http', 'type' => rver::rver_http, 'host' => '0.0.0.0', 'port' => 11111, 'sock_type' => swoole_sock_tcp, 'callbacks' => [ swooleevent::on_request => [hyperf\httprver\rver::class, 'onrequest'], ], ], [ 'name' => 'ws', 'type' => rver::rver_websocket, 'host' => '0.0.0.0', 'port' => 12222, 'sock_type' => swoole_sock_tcp, 'callbacks' => [ swooleevent::on_hand_shake => [hyperf\websocketrver\rver::class, 'onhandshake'], swooleevent::on_message => [hyperf\websocketrver\rver::class, 'onmessage'], swooleevent::on_clo => [hyperf\websocketrver\rver::class, 'onclo'], ], ], ],
<?phpdeclare(strict_types=1);/** * this file is part of hyperf. * * @link /d/file/titlepic/www.hyperf.io * @document https://doc.hyperf.io * @contact group@hyperf.io * @licen https://github.com/hyperf-cloud/hyperf/blob/master/licen */namespace app\controller;u hyperf\contract\onclointerface;u hyperf\contract\onmessageinterface;u hyperf\contract\onopeninterface;u swoole\http\request;u swoole\rver;u swoole\websocket\frame;u swoole\websocket\rver as websocketrver;class websocketcontroller extends controller implements onmessageinterface, onopeninterface, onclointerface{ /** * 发送消息 * @param websocketrver $rver * @param frame $frame */ public function onmessage(websocketrver $rver, frame $frame): void { //心跳刷新缓存 $redis = $this->container->get(\redis::class); //获取所有的客户端id $fdlist = $redis->smembers('websocket_sjd_1'); //如果当前客户端在客户端集合中,就刷新 怎样当演员 if (in_array($frame->fd, $fdlist)) { $redis->sadd('websocket_sjd_1', $frame->fd); $redis->expire('websocket_sjd_1', 7200); } $rver->push($frame->fd, 'recv: ' . $frame->data); } /** * 客户端失去链接 * @param rver hit过去式$rver * @param int $fd * @param int $reactorid */ public function onclo(rver $rver, int $fd, int $reactorid): void { //删掉客户端id $redis = $this->container->get(\redis::class); //移除集合中指定的value $redis->srem('websocket_sjd_1', $fd); var_dump('clod'); } /** * 客户端链接 * @param websocketrver $rver * @param request $request */ public function onopen(websocketrver $rver, request $request): void { //保存客户端id $redis = $this->container->get(\redis::class); $res1 = $redis->sadd('websocket_sjd_1', $request->fd); var_dump($res1); $res = $redis->expire('websocket_sjd_1', 7200); var_dump($res); $rver->push($request->fd, 'opened'); }}
function websockettest() { if ("websocket" in window) { console.log("您的浏览器支持 websocket!"); var num = 0 // 打开一个 web socket var ws = new websocket("ws://127.0.0.1:12222"); ws.onopen = function () { // web socket 已连接上,使用 nd() 方法发送数据 //alert("数据发送中..."); //ws.nd("发送数据"); }; window.tinterval(function () { //每隔5秒钟发送一次心跳,避免websocket连接因超时而自动断开 var ping = {"type": "ping"}; ws.nd(json.stringify(ping)); }, 5000); ws.onmessage = function (evt) { var d = json.par(evt.data); console.log(d); if (d.code == 300) { $(".address").text(d.address) } if (d.code == 200) { var v = d.data console.log(v); num++ var str = `<div class="item"> <p>${v.recordouttime}</p> <p>${v.uroutname}</p> <p>${v.uroutnum}</p> <p>${v.dooroutname}</p> </div>` $(".tablehead").after(str) if (num > 7) { num-- $(".table .item:nth-last-child(1)").remove() } } }; ws.error = function (e) { console.log(e) alert(e) } ws.onclo = function () { // 关闭 websocket alert("连接已关闭..."); }; } el { alert("您的浏览器不支持 websocket!"); } }
compor require hyperf/amqp
<?phpreturn [ 'default' => [ 'host' => 'localhost', 'port' => 5672, 'ur' => 'guest', 'password' => 'guest', 'vhost' => '/', 'pool' => [ 'min_connections' => 1, 'max_connections' => 10, 'connect_timeout' => 10.0, 'wait_timeout' => 3.0, 'heartbeat' => -1, ], 'params' => [ 'insist' => fal, 'login_method' => 'amqplain', 'login_respon' => null, 'locale' => 'en_us', 'connection_timeout' => 3.0, 'read_write_timeout' => 6.0, 'context' => null, 'keepalive' => fal, 'heartbeat' => 3, ], ],];
<?phpdeclare(strict_types=1);namespace app\amqp\consumer;u hyperf\amqp\annotation\consumer;u hyperf\amqp\message\consumermessage;u hyperf\amqp\result;u hyperf\rver\rver;u hyperf\rver\rverfactory;/** * @consumer(exchange="hyperf", routingkey="hyperf", queue="hyperf", nums=1) */class democonsumer extends consumermessage{ /** * rabbm顺流而下的意思itmq消费端代码 * @param $data * @return string */ public function consume($data): string { print_r($data); //获取集合中所有的value $redis = $this->container->get(\redis::class); $fdlist=$redis->smembers('w文科生有哪些专业可选ebsocket_sjd_1'); $rver=$this->container->get(rverfactory::class)->getrver()->getrver(); foreach($fdlist as $key=>$v){ if(!empty($v)){ $rver->push((int)$v, $data); } } return result::ack; }
}
/** * test * @return array */ public function test() { $data = array( 'code' => 200, 'data' => [ 'uroutname' => 'ccflow', 'uroutnum' => '9999', 'recordouttime' => date("y-m-d h:i:s", time()), 'dooroutname' => '教师公寓', ] ); $data = \guzzlehttp\json_encode($data); $message = new demoproducer($data); $producer = applicationcontext::getcontaine光子脱毛激光脱毛r()->get(producer::class); $result = $producer->produce($message); var_dump($result); $ur = $this->request->input('ur', 'hyperf'); $method = $this->request->getmethod(); return [ 'method' => $method, 'message' => "{$ur}.", ]; }
本文发布于:2023-04-08 12:11:03,感谢您对本站的认可!
本文链接:https://www.wtabcd.cn/fanwen/zuowen/00debd21e99601326fe1e113ec1fd63e.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文word下载地址:基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送.doc
本文 PDF 下载地址:基于 Hyperf 实现 RabbitMQ + WebSocket 消息推送.pdf
留言与评论(共有 0 条评论) |