Python 是一種跨平臺的計算機程序設計語言,是ABC 語言的替代品,屬于面向對象的動態類型語言。它最初被設計用于編寫自動化腳本,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨立的、大型項目的開發。
MQTT 是一個物聯網傳輸協議,用于輕量級的發布/訂閱式消息傳輸,旨在為低帶寬和不穩定的網絡環境中的物聯網設備提供可靠的網絡服務。其輕量、簡單、開放和易于實現等特點,使得它適用范圍更加廣泛。
本文主要介紹如何在 Python 項目中使用paho-mqtt客戶端庫 ,實現客戶端與MQTT服務器的連接、訂閱、取消訂閱、收發消息等功能。
一、項目準備
本項目使用 Python 3.10進行開發測試。
用戶可用以下命令來確認 Python的版本:
python3 --version
Python 3.10.9
測試設備:
瑞科慧聯(RAK)網關RAK7268 V2、帶溫濕度傳感器的數據采集器Sensor Hub
二、選擇 MQTT 客戶端庫
paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發布到 MQTT 服務器變得更簡單。
三、Pip 安裝 Paho MQTT 客戶端
Pip 是 Python 包管理工具。該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。
pip3install paho.mqtt
四、Python MQTT 使用
1、連接 MQTT 服務器
本文將使用瑞科慧聯LoRaWAN?網關提供的內置 MQTT服務,該服務基于 Mosquitto的開源消息代理。服務器接入信息如下:
- Broker:192.168.230.1
- TCP Port:1883
2、導入 Paho MQTT客戶端
from paho.mqtt import client as mqtt
3、設置 MQTT Broker 連接參數
設置 MQTT Broker 連接地址,端口以及 topic,同時調用 Pythonrandom.randint函數隨機生成 MQTT 客戶端 id。
MQTT_SERVER_IP ="192.168.230.1"
MQTT_PORT =1883
4、編寫 MQTT 連接函數
編寫連接回調函數 on_connect,該函數將在客戶端連接后會被調用。在該函數中可以依據rc來判斷客戶端是否連接成功。同時可創建一個 MQTT 客戶端連接到broker.emqx.io。
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態的回調函數
mqttClient.on_message=on_message # 返回訂閱消息回調函數
MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
5、發布消息
定義一個 while 循環語句,在循環中設置每秒調用 MQTT 客戶端publish函數向/python/mqtt主題發送消息。
ddefon_publish():
# 發布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/1/device/0000000000000444/tx'# 發布的主題,訂閱時需要使用這個主題才能訂閱此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}條消息發送成功'.format(msg_count))
else:
print('第{}條消息發送失敗'.format(msg_count))
msg_count+=1
6、訂閱消息
編寫消息回調函數 on_message,函數將在客戶端從 MQTT Broker 收到消息后被調用,并打印出訂閱的 topic 名稱以及接收到的消息內容。
defon_subscribe():
"""訂閱主題:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
time.sleep(1)
7、完整代碼
消息訂閱代碼
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
網關通過mqtt發出數據
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦連接成功, 回調此方法"""
rc_status= ["連接成功","協議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權"]
print("connect:",rc_status[rc])
defon_message(client,userdata,msg):
"""一旦訂閱到消息, 回調此方法"""
print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))
print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))
try:
temp=json.loads(msg.payload.decode())
# client.disconnect()
deveui=temp['devEUI']
print("devEUI: ",deveui)
data=temp['data']
print("解碼前的data為: ",data)
data_decode=base64.b64decode(data).hex()
print("解碼后的data為: ",data_decode)
str1=data_decode[4:]
ifstr1[0:4]=="0167":
a=int(str1[4:8],16)*0.1
print("溫度:",a,"℃")
ifstr1[8:12]=="0268":
b=int(str1[12:16],16)
print("濕度:",b,"%RH")
elifstr1[0:4]=="0268":
c=int(str1[4:8],16)
print("濕度:",c,"%RH")
exceptExceptionase:
print(e)
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態的回調函數
mqttClient.on_message=on_message # 返回訂閱消息回調函數
MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
defon_subscribe():
"""訂閱主題:mqtt/demo"""
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
whileTrue:
mqttClient.subscribe("application/#",2)
# allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")
# mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)
time.sleep(1)
if__name__=='__main__':
on_subscribe()
消息發布代碼
#!/usr/bin/python
frompaho.mqttimportclientasmqtt
importtime
importjson
# from settings import *
importbase64
"""
網關通過mqtt發出數據
json - ok
probuf - no
"""
MQTT_SERVER_IP="192.168.230.1"
MQTT_PORT=1883
defon_connect(client,userdata,flags,rc):
"""一旦連接成功, 回調此方法"""
rc_status= ["連接成功","協議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權"]
print("connect:",rc_status[rc])
defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):
"""連接MQTT服務器"""
client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))
mqttClient=mqtt.Client(client_id)
mqttClient.on_connect=on_connect # 返回連接狀態的回調函數
MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址
# MQTT_PORT = MQTT_PORT # MQTT端口
mqttClient.username_pw_set("username","password") # mqtt服務器賬號密碼
mqttClient.connect(MQTT_HOST,MQTT_PORT,60)
mqttClient.loop_start() # 啟用線程連接
returnmqttClient
defon_publish():
# 發布消息
msg_count=0
whileTrue:
time.sleep(1)
mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)
topic='application/x/device/x/tx'# 發布的主題,訂閱時需要使用這個主題才能訂閱此消息
msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發布的消息內容
result=mqttClient.publish(topic,msg)
status=result[0]
ifstatus==0:
print('第{}條消息發送成功'.format(msg_count))
else:
print('第{}條消息發送失敗'.format(msg_count))
msg_count+=1
if__name__=='__main__':
on_publish()
測試
消息發布
運行 MQTT消息發布代碼,將看到客戶端連接成功,并且成功將消息發布。
消息訂閱
通過瑞科慧聯帶溫濕度傳感器的 Sensor hub進行數據傳輸,訂閱并解析數據結果如下:
五、總結
至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網關內置 MQTT服務器,并實現了測試客戶端與 MQTT 服務器的連接、消息發布和訂閱并解析。
與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設備側的業務邏輯實現。使用 Python 可以減少代碼上的邏輯復雜度,降低與設備的交互成本。未來,我們相信在物聯網領域 Python 將會有更廣泛的應用!
-
物聯網
+關注
關注
2913文章
44923瀏覽量
376986 -
python
+關注
關注
56文章
4807瀏覽量
85037 -
MQTT
+關注
關注
5文章
653瀏覽量
22691
發布評論請先 登錄
相關推薦
評論