吴忠躺衫网络科技有限公司

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

如何在Python中使用MQTT

瑞科慧聯(RAK) ? 2022-12-22 10:41 ? 次閱讀

Python 是一種跨平臺的計算機程序設計語言,是ABC 語言的替代品,屬于面向對象的動態類型語言。它最初被設計用于編寫自動化腳本,隨著版本的不斷更新和語言新功能的添加,越來越多被用于獨立的、大型項目的開發。

MQTT 是一個物聯網傳輸協議,用于輕量級的發布/訂閱式消息傳輸,旨在為低帶寬和不穩定的網絡環境中的物聯網設備提供可靠的網絡服務。其輕量、簡單、開放和易于實現等特點,使得它適用范圍更加廣泛。

本文主要介紹如何在 Python 項目中使用paho-mqtt客戶端庫 ,實現客戶端與MQTT服務器的連接、訂閱、取消訂閱、收發消息等功能。

一、項目準備

本項目使用 Python 3.10進行開發測試。

用戶可用以下命令來確認 Python的版本:

python3 --version

Python 3.10.9

測試設備:

瑞科慧聯(RAK)網關RAK7268 V2、帶溫濕度傳感器的數據采集器Sensor Hub

二、選擇 MQTT 客戶端庫

paho-mqtt是目前 Python 中使用較多的 MQTT 客戶端庫。它為 Python 2.7 或 3.x 版本以上的客戶端類提供了對 MQTT v3.1 和 v3.1.1 的支持,還提供了一些幫助程序功能。這使得消息發布到 MQTT 服務器變得更簡單。

三、Pip 安裝 Paho MQTT 客戶端

Pip 是 Python 包管理工具。該工具提供了對 Python 包的查找、下載、安裝、卸載的功能。

pip3install paho.mqtt

四、Python MQTT 使用

1、連接 MQTT 服務器

本文將使用瑞科慧聯LoRaWAN?網關提供的內置 MQTT服務,該服務基于 Mosquitto的開源消息代理。服務器接入信息如下:

  • Broker:192.168.230.1
  • TCP Port:1883

2、導入 Paho MQTT客戶端

from paho.mqtt import client as mqtt

3、設置 MQTT Broker 連接參數

設置 MQTT Broker 連接地址,端口以及 topic,同時調用 Pythonrandom.randint函數隨機生成 MQTT 客戶端 id。

MQTT_SERVER_IP ="192.168.230.1"

MQTT_PORT =1883

4、編寫 MQTT 連接函數

編寫連接回調函數 on_connect,該函數將在客戶端連接后會被調用。在該函數中可以依據rc來判斷客戶端是否連接成功。同時可創建一個 MQTT 客戶端連接到broker.emqx.io

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態的回調函數

    mqttClient.on_message=on_message # 返回訂閱消息回調函數

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

5、發布消息

定義一個 while 循環語句,在循環中設置每秒調用 MQTT 客戶端publish函數向/python/mqtt主題發送消息。

ddefon_publish():

    # 發布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/1/device/0000000000000444/tx'# 發布的主題,訂閱時需要使用這個主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發送成功'.format(msg_count))

        else:

            print('第{}條消息發送失敗'.format(msg_count))

        msg_count+=1

6、訂閱消息

編寫消息回調函數 on_message,函數將在客戶端從 MQTT Broker 收到消息后被調用,并打印出訂閱的 topic 名稱以及接收到的消息內容。

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        time.sleep(1)

7、完整代碼

消息訂閱代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網關通過mqtt發出數據

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調此方法"""

    rc_status= ["連接成功","協議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權"]

    print("connect:",rc_status[rc])

defon_message(client,userdata,msg):

    """一旦訂閱到消息, 回調此方法"""

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode('gbk')))

    print("主題"+msg.topic +" 消息"+str(msg.payload.decode()))

    try:

        temp=json.loads(msg.payload.decode())

        # client.disconnect()

        deveui=temp['devEUI']

        print("devEUI: ",deveui)

        data=temp['data']

        print("解碼前的data為: ",data)

        data_decode=base64.b64decode(data).hex()

        print("解碼后的data為: ",data_decode)

        str1=data_decode[4:]

        ifstr1[0:4]=="0167":

            a=int(str1[4:8],16)*0.1 

            print("溫度:",a,"℃")

            ifstr1[8:12]=="0268":

               b=int(str1[12:16],16)

            print("濕度:",b,"%RH")

        elifstr1[0:4]=="0268":

            c=int(str1[4:8],16)

            print("濕度:",c,"%RH")                       

    exceptExceptionase:

        print(e)

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態的回調函數

    mqttClient.on_message=on_message # 返回訂閱消息回調函數

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_subscribe():

    """訂閱主題:mqtt/demo"""

    mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

    whileTrue:

        mqttClient.subscribe("application/#",2)

        # allure.attach("gateway/" + GATEWAY_EUI + "/event/up", name="topic")

        # mqttClient.subscribe("gateway/ac1f09fffe08f099/event/up", 2)

        time.sleep(1)

if__name__=='__main__':

    on_subscribe()

消息發布代碼

#!/usr/bin/python

frompaho.mqttimportclientasmqtt

importtime

importjson

# from settings import *

importbase64



"""

網關通過mqtt發出數據

json - ok

probuf - no

"""

MQTT_SERVER_IP="192.168.230.1"

MQTT_PORT=1883

defon_connect(client,userdata,flags,rc):

    """一旦連接成功, 回調此方法"""

    rc_status= ["連接成功","協議版本錯誤","無效的客戶端標識","服務器無法使用","用戶名或密碼錯誤","無授權"]

    print("connect:",rc_status[rc])

defmqtt_connect(MQTT_SERVER_IP,MQTT_PORT):

    """連接MQTT服務器"""

    client_id=time.strftime('%Y%m%d%H%M%S',time.localtime(time.time()))

    mqttClient=mqtt.Client(client_id)

    mqttClient.on_connect=on_connect # 返回連接狀態的回調函數

    MQTT_HOST=MQTT_SERVER_IP # MQTT服務器地址

    # MQTT_PORT = MQTT_PORT  # MQTT端口

    mqttClient.username_pw_set("username","password")  # mqtt服務器賬號密碼

    mqttClient.connect(MQTT_HOST,MQTT_PORT,60)

    mqttClient.loop_start()  # 啟用線程連接

    returnmqttClient

defon_publish():

    # 發布消息

    msg_count=0

    whileTrue:

        time.sleep(1)

        mqttClient=mqtt_connect(MQTT_SERVER_IP,MQTT_PORT)

        topic='application/x/device/x/tx'# 發布的主題,訂閱時需要使用這個主題才能訂閱此消息

        msg='{"confirmed": true,"data": "SGVsbG8=","fPort": 10}'#需要發布的消息內容

        result=mqttClient.publish(topic,msg)

        status=result[0]

        ifstatus==0:

            print('第{}條消息發送成功'.format(msg_count))

        else:

            print('第{}條消息發送失敗'.format(msg_count))

        msg_count+=1

if__name__=='__main__':

    on_publish()

測試

消息發布

運行 MQTT消息發布代碼,將看到客戶端連接成功,并且成功將消息發布。

pYYBAGOjwVmAR1KUAAApM_Y0F48108.png

消息訂閱

通過瑞科慧聯帶溫濕度傳感器的 Sensor hub進行數據傳輸,訂閱并解析數據結果如下:

poYBAGOjwVmAdS2hAABgCqVnG0E194.png

五、總結

至此,我們完成了使用paho-mqtt客戶端連接到LoRaWAN?網關內置 MQTT服務器,并實現了測試客戶端與 MQTT 服務器的連接、消息發布和訂閱并解析。

與 C ++ 或 Java 之類的高級語言不同,Python 比較適合設備側的業務邏輯實現。使用 Python 可以減少代碼上的邏輯復雜度,降低與設備的交互成本。未來,我們相信在物聯網領域 Python 將會有更廣泛的應用!

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • 物聯網
    +關注

    關注

    2913

    文章

    44923

    瀏覽量

    376986
  • python
    +關注

    關注

    56

    文章

    4807

    瀏覽量

    85037
  • MQTT
    +關注

    關注

    5

    文章

    653

    瀏覽量

    22691
收藏 人收藏

    評論

    相關推薦

    使用Python實現xgboost教程

    使用Python實現XGBoost模型通常涉及以下幾個步驟:數據準備、模型訓練、模型評估和模型預測。以下是一個詳細的教程,指導你如何在Python中使用XGBoost。 1. 安裝XG
    的頭像 發表于 01-19 11:21 ?396次閱讀

    何在Windows中使用MTP協議

    、圖片等)的通信協議,它被廣泛用于Android設備。以下是如何在Windows中使用MTP協議的詳細步驟: 1. 確保設備支持MTP 首先,你需要確認你的設備支持MTP協議。大多數現代Android
    的頭像 發表于 01-03 10:26 ?408次閱讀

    何在Python中使用socket

    和UDP。 2. 創建Socket 在Python中,我們使用 socket 模塊來創建socket。以下是創建一個TCP socket的示例代碼: import socket # 創建一個socket
    的頭像 發表于 11-01 16:10 ?283次閱讀

    何在智能手機系統中使用bq27505

    電子發燒友網站提供《如何在智能手機系統中使用bq27505.pdf》資料免費下載
    發表于 10-17 10:21 ?0次下載
    如<b class='flag-5'>何在</b>智能手機系統<b class='flag-5'>中使</b>用bq27505

    何在MSP430?MCU中使用智能模擬組合

    電子發燒友網站提供《如何在MSP430?MCU中使用智能模擬組合.pdf》資料免費下載
    發表于 09-14 10:19 ?0次下載
    如<b class='flag-5'>何在</b>MSP430?MCU<b class='flag-5'>中使</b>用智能模擬組合

    何在反向降壓-升壓拓撲中使用TPS6290x

    電子發燒友網站提供《如何在反向降壓-升壓拓撲中使用TPS6290x.pdf》資料免費下載
    發表于 09-13 10:07 ?0次下載
    如<b class='flag-5'>何在</b>反向降壓-升壓拓撲<b class='flag-5'>中使</b>用TPS6290x

    何在汽車CAN應用中使用負邊緣觸發觸發器節省電力

    電子發燒友網站提供《如何在汽車CAN應用中使用負邊緣觸發觸發器節省電力.pdf》資料免費下載
    發表于 09-13 10:06 ?0次下載
    如<b class='flag-5'>何在</b>汽車CAN應用<b class='flag-5'>中使</b>用負邊緣觸發觸發器節省電力

    何在新興的低軌衛星應用中使用數字隔離器隔離信號

    電子發燒友網站提供《如何在新興的低軌衛星應用中使用數字隔離器隔離信號.pdf》資料免費下載
    發表于 09-12 09:37 ?0次下載
    如<b class='flag-5'>何在</b>新興的低軌衛星應用<b class='flag-5'>中使</b>用數字隔離器隔離信號

    何在RTOS中使用spi_interface.c?

    何在 RTOS 中使用 spi_interface.c?
    發表于 07-10 06:29

    求助,請問如何在RTOS SDK 1.5的PlatformIO IDE ESP8266實現MQTT

    ESP8266設備連接到 mqtt 代理。但 PlatformIO IDE 內置的 RTOS SDK 1.5 版本不支持 mqtt。此 SDK 沒有 mqtt 示例。所以你能不能讓我知道我如
    發表于 07-08 06:22

    為什么使用MQTT而不是HTTP?

    為什么使用MQTT而不是HTTP? 在探討為何在某些場景下選擇MQTT(Message Queuing Telemetry Transport)而非HTTP(Hypertext Transfer
    的頭像 發表于 06-19 14:26 ?528次閱讀
    為什么使用<b class='flag-5'>MQTT</b>而不是HTTP?

    請問cmakelists中的變量如何在程序中使用?

    大家好, 我有個問題請教,cmakelists.txt中的變量如何在程序中使用?比如以下cmakelists.txt文件中的PROJECT_VER變量,我如何在c程序中使用?試了很多辦
    發表于 06-11 07:34

    工業計算機是什么?如何在不同行業中使用?

    工業電腦是專為在工業環境中使用而設計的計算機。它們可用于各個行業,包括制造、運 輸和能源。它們通常比普通計算機更強大,并且能夠在大多數計算機無法運行的環境中運行。在本文中,我們將更深入地了解什么是工業計算機以及它們如何在不同行業中使
    的頭像 發表于 04-01 15:45 ?940次閱讀
    工業計算機是什么?如<b class='flag-5'>何在</b>不同行業<b class='flag-5'>中使</b>用?

    Raspberry Pi樹莓派使用Python實現MQTT通信設計

    這次的例子,主要講述如何基于PYTHONMQTT 客戶端的使用方法
    的頭像 發表于 03-14 11:45 ?925次閱讀
    Raspberry Pi樹莓派使用<b class='flag-5'>Python</b>實現<b class='flag-5'>MQTT</b>通信設計

    何在測試中使用ChatGPT

    Dimitar Panayotov 在 2023 年 QA Challenge Accepted 大會 上分享了他如何在測試中使用 ChatGPT。
    的頭像 發表于 02-20 13:57 ?802次閱讀
    百家乐官网高人玩法| 澳门百家乐网40125| 大家旺百家乐官网娱乐城| 狮威百家乐官网娱乐| bet365提款要多久| 百家乐怎么开户| 博坊百家乐官网游戏| 威尼斯人娱乐客户端| 希尔顿百家乐官网娱乐城 | 大发888亚洲城娱乐城| BB百家乐官网大转轮| 二爷百家乐的玩法技巧和规则| 阳原县| 澳门百家乐网址| 网上百家乐官网公| 谢通门县| 网页百家乐官网| 真人百家乐官网送钱| 金濠国际网| 百家乐官网英皇娱乐场开户注册| 大发888官方备用| 百家乐庄闲| 网上百家乐官网作弊下载| 大发888开户注册| 任我赢百家乐自动投注系统| 百家乐官网沙| 化隆| 大发888娱乐厂场| 赌场百家乐信誉| 巴黎人百家乐官网的玩法技巧和规则 | 赌场百家乐官网欺诈方法| 大富豪棋牌游戏中心| 百家乐对付抽水| KK百家乐官网的玩法技巧和规则| 新巴尔虎左旗| 德州扑克 在线| 百家乐庄闲庄庄闲| 赌百家乐的体会| 蓝盾百家乐官网庄家利润分| 百家乐官网的如何玩| 娱乐城注册送现金|