php实现mqtt发布发送消息到主题
php实现mqtt发布/发送消息到主题
mqtt是啥?我的博客有写这个东西:
php想要实现mqtt需要使⽤到php中的socket函数;
此次使⽤的是⽹上开源mqtt案例:其中使⽤的是 stream_socket_xxxx 系列函数
⼤概意思是:
正如你所指出的,'stream'是PHP核⼼(内置的,始终可⽤),⽽'套接字'是很少包含的扩展的⼀部分。除此之外,它们⼏乎完全相同。您可以同时使⽤TCP和UDP两种流,也可以使⽤阻塞和⾮阻塞模式,这些模式涵盖了所有⽤例的99%。
我能想到的唯⼀常见的例外是ICMP。例如,'ping'。但是,看起来⽬前还没有⼀种安全的⽅式来从PHP执⾏ICMP。这种调⽤需要通过套接字扩展来实现SOCK_RAW,这需要执⾏“root”权限。此外,并⾮所有路由器都会在TCP,UDP和ICMP之外路由其他数据包类型。这限制了套接字扩展的实⽤性。
MQTT类代码:
/* phpMQTT */
class Mqtt {
private $socket; /* holds the socket */
private $msgid = 1; /* counter for message id */
public $keepalive = 10; /* default keepalive timmer */
public $timesinceping; /* host unix time, ud to detect disconects */
public $topics = array(); /* ud to store currently subscribed topics */
public $debug = fal; /* should output debug messages */
public $address; /* broker address */
public $port; /* broker port */
public $clientid; /* client id nt to brocker */
public $will; /* stores the will of the client */
private $urname; /* stores urname */
private $password; /* stores password */
public $cafile;
function __construct($address, $port, $clientid, $cafile = NULL){
$this->broker($address, $port, $clientid, $cafile);
}
/
* ts the broker details */
function broker($address, $port, $clientid, $cafile = NULL){
$this->address = $address;
$this->port = $port;
$this->clientid = $clientid;
$this->cafile = $cafile;
tourists}
function connect_auto($clean = true, $will = NULL, $urname = NULL, $password = NULL){
function connect_auto($clean = true, $will = NULL, $urname = NULL, $password = NULL){
dumpwhile($this->connect($clean, $will, $urname, $password)==fal){
sleep(10);
}
return true;
}
/* connects to the broker
inputs: $clean: should the client nd a clean ssion flag */
function connect($clean = true, $will = NULL, $urname = NULL, $password = NULL){
if($will) $this->will = $will;
if($urname) $this->urname = $urname;
if($password) $this->password = $password;
if ($this->cafile) {
$socketContext = stream_context_create(["ssl" => [
"verify_peer_name" => true,
"cafile" => $this->cafile
]]);
$this->socket = stream_socket_client("tls://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT, $socketContext); } el {
$this->socket = stream_socket_client("tcp://" . $this->address . ":" . $this->port, $errno, $errstr, 60, STREAM_CLIENT_CONNECT);
}
if (!$this->socket ) {
if($this->debug) error_log("stream_socket_create() $errno, $errstr \n");
return fal;
car cam}
stream_t_timeout($this->socket, 5);
stream_t_blocking($this->socket, 0);
$i = 0;
$buffer = "";
$buffer .= chr(0x00); $i++;
$buffer .= chr(0x06); $i++;
$buffer .= chr(0x4d); $i++;
$buffer .= chr(0x51); $i++;
$buffer .= chr(0x49); $i++;
$buffer .= chr(0x73); $i++;
$buffer .= chr(0x64); $i++;
$buffer .= chr(0x70); $i++;
$buffer .= chr(0x03); $i++;
//No Will
$var = 0;
if($clean) $var+=2;
//Add will info to header
if($this->will != NULL){
$var += 4; // Set will flag
$var += ($this->will['qos'] << 3); //Set will qos
if($this->will['retain']) $var += 32; //Set will retain
}
if($this->urname != NULL) $var += 128; //Add urname to header
if($this->password != NULL) $var += 64; //Add password to header
$buffer .= chr($var); $i++;
//Keep alive
$buffer .= chr($this->keepalive >> 8); $i++;
$buffer .= chr($this->keepalive & 0xff); $i++;laoyi
$buffer .= chr($this->keepalive & 0xff); $i++;
$buffer .= $this->strwritestring($this->clientid,$i);
//Adding will to payload
if($this->will != NULL){
$buffer .= $this->strwritestring($this->will['topic'],$i);
$buffer .= $this->strwritestring($this->will['content'],$i);
}
if($this->urname) $buffer .= $this->strwritestring($this->urname,$i); if($this->password) $buffer .= $this->strwritestring($this->password,$i);
$head = " ";instagram是什么意思
$head{0} = chr(0x10);
$head{1} = chr($i);
fwrite($this->socket, $head, 2);
fwrite($this->socket, $buffer);
$string = $this->read(4);
if(ord($string{0})>>4 == 2 && $string{3} == chr(0)){
if($this->debug) echo "Connected to Broker\n";
}el{
error_log(sprintf("Connection failed! (Error: 0x%02x 0x%02x)\n",
ord($string{0}),ord($string{3})));
return fal;
}
$this->timesinceping = time();
return true;
}
/* read: reads in so many bytes */
function read($int = 8192, $nb = fal){
// print_r(socket_get_status($this->socket));
$string="";
$togo = $int;
if($nb){
return fread($this->socket, $togo);
}
while (!feof($this->socket) && $togo>0) {
$fread = fread($this->socket, $togo);
bullet train$string .= $fread;
$togo = $int - strlen($string);
}
return $string;
}
/* subscribe: subscribes to topics */
function subscribe($topics, $qos = 0){
$i = 0;
$buffer = "";
$id = $this->msgid;
$buffer .= chr($id >> 8); $i++;
$buffer .= chr($id % 256); $i++;
$buffer .= chr($id % 256); $i++;
foreach($topics as $key => $topic){
$buffer .= $this->strwritestring($key,$i);
$buffer .= chr($topic["qos"]); $i++;
$this->topics[$key] = $topic;
}
$cmd = 0x80;
//$qos
$cmd += ($qos << 1);
广州汽修学校$head = chr($cmd);
$head .= chr($i);
fwrite($this->socket, $head, 2);
fwrite($this->socket, $buffer, $i);
$string = $this->read(2);
$bytes = ord(substr($string,1,1));
$string = $this->read($bytes);
}
/* ping: nds a keep alive ping */
stxfunction ping(){
$head = " ";
$head = chr(0xc0);
$head .= chr(0x00);
精彩电影推荐fwrite($this->socket, $head, 2);
if($this->debug) echo "ping nt\n";
}
/
* disconnect: nds a proper disconect cmd */
function disconnect(){
$head = " ";
$head{0} = chr(0xe0);
$head{1} = chr(0x00);
fwrite($this->socket, $head, 2);
}
/* clo: nds a proper disconect, then clos the socket */
function clo(){
$this->disconnect();
stream_socket_shutdown($this->socket, STREAM_SHUT_WR); }
/
* publish: publishes $content on a $topic */
function publish($topic, $content, $qos = 0, $retain = 0){
$i = 0;
$buffer = "";
$buffer .= $this->strwritestring($topic,$i);
//$buffer .= $this->strwritestring($content,$i);
if($qos){
$id = $this->msgid++;
$buffer .= chr($id >> 8); $i++;
$buffer .= chr($id % 256); $i++;
}
$buffer .= $content;
$i+=strlen($content);
$head = " ";
$cmd = 0x30;
if($qos) $cmd += $qos << 1;
if($retain) $cmd += 1;
$head{0} = chr($cmd);
$head .= $this->tmsglength($i);
fwrite($this->socket, $head, strlen($head));
fwrite($this->socket, $buffer, $i);
}
/
* message: process a recieved topic */
function message($msg){
$tlen = (ord($msg{0})<<8) + ord($msg{1});
$topic = substr($msg,2,$tlen);
$msg = substr($msg,($tlen+2));
$found = 0;
my family英语作文foreach($this->topics as $key=>$top){
if( preg_match("/^".str_replace("#",".*",
str_replace("+","[^\/]*",
str_replace("/","\/",
str_replace("$",'\$',
$key))))."$/",$topic) ){
if(is_callable($top['function'])){
call_ur_func($top['function'],$topic,$msg);
$found = 1;
}
}
}
if($this->debug && !$found) echo "msg recieved but no match in subscriptions\n";
}
/* proc: the processing loop for an "allways on" client
t true when you are doing other stuff in the loop good for watching something el at the same time */ function proc( $loop = true){
if(1){
$sockets = array($this->socket);
$w = $e = NULL;
$cmd = 0;
//$byte = fgetc($this->socket);
if(feof($this->socket)){
if($this->debug) echo "eof receive going to reconnect for good measure\n";
fclo($this->socket);
$this->connect_auto(fal);
if(count($this->topics))
$this->subscribe($this->topics);
}
$byte = $this->read(1, true);
if(!strlen($byte)){
if($loop){
usleep(100000);
}
}el{
$cmd = (int)(ord($byte)/16);
if($this->debug) echo "Recevid: $cmd\n";