mqtt消息发送订阅自动化测试脚本demo

更新时间:2023-07-01 11:54:28 阅读: 评论:0

mqtt消息发送订阅⾃动化测试脚本demo怎么更新系统
# -*- coding:utf-8 -*-
from multiprocessing import Process
import paho.mqtt.publish as publish
import paho.mqtt.subscribe as subscribe
import time,json
HOST = "127.0.0.1"
PORT = 11883
NAME = "mqname"
PASSWORD = "mqpwd"
TOPIC = "test/hito"
我的收藏夹
client_id = time.strftime('mq:test:%Y%m%d%H%M%S',time.localtime(time.time()))
def mqtt_publish():
print"===============>pub"
msg = {"msg_id": "101343506507657"}
msg = json.dumps(msg)
publish.single(TOPIC, msg, qos = 1,hostname=HOST,port=PORT, client_id=client_id,auth = {'urname':NAME, 'password':PASSWORD}) print"===============> pub over"
def mqtt_subscribe():
print"===============>sub"
msg = subscribe.simple(TOPIC, qos = 1,hostname=HOST,port=PORT, client_id=client_id,auth = {'urname':NAME, 'password':PASSWORD}) pic+" :"+msg.payload.decode("utf-8")k1x
if__name__ == '__main__':
sub=Process(target=mqtt_subscribe)
pub=Process(target=mqtt_publish)
sub.start()
葱油千层饼
time.sleep(1)
pub.start()attributed
sub.join()
教师轮岗制度print("main-------")宣传的英文

本文发布于:2023-07-01 11:54:28,感谢您对本站的认可!

本文链接:https://www.wtabcd.cn/fanwen/fan/82/1072086.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:订阅   动化   轮岗
相关文章
留言与评论(共有 0 条评论)
   
验证码:
推荐文章
排行榜
Copyright ©2019-2022 Comsenz Inc.Powered by © 专利检索| 网站地图