mqtt消息发送订阅⾃动化测试脚本demo怎么更新系统
# -*- coding:utf-8 -*-
from multiprocessing import Process
import paho.mqtt.publish as publish
import paho.mqtt.subscribe as subscribe
import time,json
HOST = "127.0.0.1"
PORT = 11883
NAME = "mqname"
PASSWORD = "mqpwd"
TOPIC = "test/hito"
我的收藏夹
client_id = time.strftime('mq:test:%Y%m%d%H%M%S',time.localtime(time.time()))
def mqtt_publish():
print"===============>pub"
msg = {"msg_id": "101343506507657"}颧
msg = json.dumps(msg)
publish.single(TOPIC, msg, qos = 1,hostname=HOST,port=PORT, client_id=client_id,auth = {'urname':NAME, 'password':PASSWORD}) print"===============> pub over"
def mqtt_subscribe():
print"===============>sub"
msg = subscribe.simple(TOPIC, qos = 1,hostname=HOST,port=PORT, client_id=client_id,auth = {'urname':NAME, 'password':PASSWORD}) pic+" :"+msg.payload.decode("utf-8")k1x
if__name__ == '__main__':
sub=Process(target=mqtt_subscribe)
pub=Process(target=mqtt_publish)
sub.start()
葱油千层饼
time.sleep(1)
pub.start()attributed
sub.join()
教师轮岗制度print("main-------")宣传的英文