吴忠躺衫网络科技有限公司

0
  • 聊天消息
  • 系統消息
  • 評論與回復
登錄后你可以
  • 下載海量資料
  • 學習在線課程
  • 觀看技術視頻
  • 寫文章/發帖/加入社區
會員中心
創作中心

完善資料讓更多小伙伴認識你,還能領取20積分哦,立即完善>

3天內不再提示

RabbitMq入門教程

馬哥Linux運維 ? 來源:稀土掘金技術社區 ? 2023-12-04 11:10 ? 次閱讀

RabbitMQ 簡介

RabbitMQ是一個開源的,在AMQP基礎上完整的,可復用的企業消息系統。

支持主流的操作系統Linux、Windows、MacOX等

多種開發語言支持,JavaPython、Ruby、.NET、PHP、C/C++node.js

AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。2006年,AMQP 規范發布。

2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。

RabbitMQ基本概念

RabbitMQ 基礎架構:

059a20de-91c6-11ee-939d-92fbcf53809c.jpg

Broker

接收和分發消息的應用,RabbitMQ Server就是 Message Broker

Virtual host

虛擬主機,出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等,每一個虛擬主機都有AMQP的全套基礎組件,并且可以針對每個虛擬主機進行權限以及數據分配,并且不同虛擬主機之間是完全隔離的。

Connection

客戶端與RabbitMQ進行交互,首先就需要建立一個TPC連接。RabbitMQ為了減少性能開銷,也會在一個Connection中建立多個Channel,這樣便于客戶端進行多線程連接,這些連接會復用同一個Connection的TCP通道,提高性能。

Channel

客戶端與RabbitMQ建立了連接,就會分配一個AMQP信道 Channel。每個信道都會被分配一個唯一的ID。

Exchange

消息隊列交換機,消息發送到RabbitMQ中后,會首先進入一個交換機,然后由交換機負責將數據轉發到不同的隊列中。RabbitMQ中有多種不同類型的交換機來支持不同的路由策略。

交換機多用來與生產者打交道。生產者發送的消息通過Exchange交換機分配到各個不同的Queue隊列上,而對于消息消費者來說,通常只需要關注自己的隊列就可以了。

Queue

消息隊列,隊列是實際保存數據的最小單位。隊列結構天生就具有FIFO的順序。

Producer

消息生產者,即生產方客戶端,生產方客戶端將消息發送

Consumer

消息消費者,即消費方客戶端,接收MQ轉發的消息。

消息發送者的固定步驟

1.創建消息生產者producer,并制定生產者組名

2.指定Nameserver地址

3.啟動producer

4.創建消息對象,指定主題Topic、Tag和消息體

5.發送消息

6.關閉生產者producer

消息消費者的固定步驟

1.創建消費者Consumer,制定消費者組名

2.指定Nameserver地址

3.訂閱主題Topic和Tag

4.設置回調函數,處理消息

5.啟動消費者consumer

編程模型

引入依賴



    com.rabbitmq
    amqp-client
    5.9.0

創建連接獲取Channel


ConnectionFactory factory = new ConnectionFactory()
factory.setHost(HOST_NAME)
factory.setPort(HOST_PORT)
factory.setUsername(USER_NAME)
factory.setPassword(PASSWORD)
factory.setVirtualHost(VIRTUAL_HOST)
Connection connection = factory.newConnection()
Channel channel = connection.createChannel()

聲明Exchange(可選)

channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map arguments) throws IOException;

Exchange有四種類型: fanout、 topic 、headers 、direct

聲明queue


 channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map arguments);

durable 表示是否持久化。Durable選項表示會將隊列的消息寫入硬盤,這樣服務重啟后這些消息就不會丟失。

聲明Exchange與Queue的綁定關系(可選)


channel.queueBind(String queue, String exchange, String routingKey) throws IOException;

聲明了Exchange和Queue,那么就還需要聲明Exchange與Queue的綁定關系Binding。有了這些Binding,Exchange才可以知道Producer發送過來的消息將要分發到哪些Queue上。

這些Binding涉及到消息的不同分發邏輯,與Exchange和Queue一樣,如果Broker上沒有建立綁定關系,那么RabbitMQ會按照客戶端的聲明,創建這些綁定關系。

發送消息

channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;

其中Exchange如果不需要,傳個空字符串。

props的這些配置項,可以用RabbitMQ中提供的一個Builder對象來構建。


  AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder()
  //對應頁面上的Properties部分,傳入一些預定的參數值。
  builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode())
  builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority())
  //builder.headers(headers)
  builder.build()
  AMQP.BasicProperties prop = builder.build()

MessageProperties.PERSISTENT_TEXT_PLAIN是RabbitMQ提供的持久化消息的默認配置。

消費消息

被動消費模式

Consumer等待rabbitMQ 服務器將message推送過來再消費。

channel.basicConsume(String queue, boolean autoAck, Consumer callback);

主動消費模式

Comsumer主動到rabbitMQ服務器上去拉取messge進行消費。

GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)

消費消息確認

自動ACK:autoAck為true,消息一旦被接收,消費者自動發送ACK,如果消費失敗了,后續也無法再消費了

手動ACK:autoAck為false,消息接收后,不會發送ACK,需要手動調用 channel.basicAck 來通知服務器已經消費了該message.這樣即使Consumer在執行message過程中出問題了,也不會造成消息丟失

釋放資源

channel.close()
conection.clouse()

消息模型

簡單模式

最直接的方式,P端發送一個消息到一個指定的queue,中間不需要任何exchange規則。C端按queue方式進行消費。

05b189f4-91c6-11ee-939d-92fbcf53809c.jpg

在上圖的模型中,有以下概念:

P:生產者,也就是要發送消息的程序

C:消費者:消息的接受者。

queue:消息隊列,圖中紅色部分。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。

一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)。

producer:


channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

consumer:


channel.queueDeclare(QUEUE_NAME, false, false, false, null);

Work queues 工作隊列模式

05b84078-91c6-11ee-939d-92fbcf53809c.jpg

Work Queues:與簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。一個消息只會被一個消費者消費

一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)。

應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。


channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); 
channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));

Consumer: 每次拉取一條消息。


channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
channel.basicQos(1);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

Publish/Subscribe 發布訂閱

05ca0952-91c6-11ee-939d-92fbcf53809c.jpg

exchange type是 fanout

在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:

P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)

C:消費者,消息的接收者

Queue:消息隊列,接收消息、緩存消息Exchange:交換機(X)。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。

Exchange有常見以下3種類型:

Fanout:廣播,將消息交給所有綁定到交換機的隊列,交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。

Direct:定向,把消息交給符合指定routing key 的隊列

Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列

Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失!producer只負責發送消息,至于消息進入哪個queue,由exchange來分配。

使用場景:

所有消費者獲得相同的消息,例如天氣預報。

生產者:


channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));

消費者:


channel.exchangeDeclare(EXCHANGE_NAME, "fanout")
String queueName = channel.queueDeclare().getQueue()
channel.queueBind(queueName, EXCHANGE_NAME, "")

發布訂閱模式與工作隊列模式的區別:

工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機

發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)

發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機

Rout 路由模式

exchange typ 是 direct

05d162f6-91c6-11ee-939d-92fbcf53809c.jpg

P:生產者,向 Exchange 發送消息,發送消息時,會指定一個routing key

X:Exchange(交換機),接收生產者的消息,然后把消息遞交給與 routing key 完全匹配的隊列

C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息

C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的消息

路由模式要求隊列在綁定交換機時要指定 routing key,消息會轉發到符合 routing key 的隊列。

生產者:


channel.exchangeDeclare(EXCHANGE_NAME, "direct");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

消費者:


channel.exchangeDeclare(EXCHANGE_NAME, "direct")
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1)
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2)
channel.basicConsume(queueName, true, consumer)

Topics 通配符模式

exchange type 是 topic

05e4c09e-91c6-11ee-939d-92fbcf53809c.jpg

紅色 Queue:綁定的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到

黃色 Queue:綁定的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配

對routingKey進行了模糊匹配單詞之間用,隔開,* 代表一個具體的單詞。# 代表0個或多個單詞

Topic 主題模式可以實現 Pub/Sub 發布與訂閱模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的時候可以使用通配符,顯得更加靈活。

Producer:


channel.exchangeDeclare(EXCHANGE_NAME, "topic");
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));

Receiver:


channel.exchangeDeclare(EXCHANGE_NAME, "topic")
channel.queueBind(queueName, EXCHANGE_NAME, routingKey1)
channel.queueBind(queueName, EXCHANGE_NAME, routingKey2)
channel.basicConsume(queueName, true, consumer)

發送消息確認

發送的消息如果沒有被消費者及時消費有可能會導致消息丟失。

發送者確認模式默認是不開啟的,所以如果需要開啟發送者確認模式,需要手動在channel中進行聲明。


channel.confirmSelect()

使用異步確認消息保證消息在生產端不丟失。

Producer在channel中注冊監聽器來對消息進行確認。核心代碼:


channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)

ConfirmCallback,監聽器接口,里面只有一個方法:

void handle(long sequenceNumber, boolean multiple) throws IOException; 

這方法中的兩個參數,

sequenceNumer:這個是一個唯一的序列號,代表一個唯一的消息。在RabbitMQ中,他的消息體只是一個二進制數組,默認消息是沒有序列號的。那么在回調的時候,Producer怎么知道是哪一條消息成功或者失敗呢?RabbitMQ提供了一個方法int sequenceNumber = channel.getNextPublishSeqNo();來生成一個全局遞增的序列號,這個序列號將會分配給新發送的那一條消息。然后應用程序需要自己來將這個序列號與消息對應起來。沒錯!是的!需要客戶端自己去做對應!

multiple:這個是一個Boolean型的參數。如果是false,就表示這一次只確認了當前一條消息。如果是true,就表示RabbitMQ這一次確認了一批消息,在sequenceNumber之前的所有消息都已經確認完成了。

SpringBoot集成RabbitMQ

添加依賴


       
            org.springframework.boot
            spring-boot-starter-amqp
        

配置文件


server:
  port: 8081
spring:
  application:
    name: test-rabbitmq-producer
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtualHost: /

rabbitMQ配置類

配置Exchange、Queue、及綁定交換機,下面配置Topic交換機。


package com.example.config;
 
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
 


 * RabbitmqConfig
 */
@Configuration
public class RabbitmqConfig {
 
    public static final String QUEUE_INFORM_EMAIL = "queue_inform_email";
    public static final String QUEUE_INFORM_SMS = "queue_inform_sms";
    public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform";
    public static final String ROUTINGKEY_EMAIL="inform.#.email.#";
    public static final String ROUTINGKEY_SMS="inform.#.sms.#";
 
    
    @Bean(EXCHANGE_TOPICS_INFORM)
    public Exchange EXCHANGE_TOPICS_INFORM(){
        
        return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build();
    }
 
    
    @Bean(QUEUE_INFORM_EMAIL)
    public Queue QUEUE_INFORM_EMAIL(){
        return new Queue(QUEUE_INFORM_EMAIL);
    }
    
    @Bean(QUEUE_INFORM_SMS)
    public Queue QUEUE_INFORM_SMS(){
        return new Queue(QUEUE_INFORM_SMS);
    }
 
    
    @Bean
    public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue,
                                              @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    
    @Bean
    public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue,
                                          @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }
 
}

發送消息


String message = "hello world"
rabbitTemplate.convertAndSend(RabbitmqTopicConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message)

消費消息

消費者都是通過@RabbitListener注解來聲明。在@RabbitMQListener注解中包含了非常多對Queue進行定制的屬性,大部分的屬性都是有默認值的。

    
    @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_EMAIL})
    public void receive_email(Object msg, Message message, Channel channel){
        System.out.println("QUEUE_INFORM_EMAIL msg"+msg);
    }
    
    @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_SMS})
    public void receive_sms(Object msg, Message message, Channel channel){
        System.out.println("QUEUE_INFORM_SMS msg"+msg);
    }

審核編輯:湯梓紅

聲明:本文內容及配圖由入駐作者撰寫或者入駐合作網站授權轉載。文章觀點僅代表作者本人,不代表電子發燒友網立場。文章及其配圖僅供工程師學習之用,如有內容侵權或者其他違規問題,請聯系本站處理。 舉報投訴
  • Linux
    +關注

    關注

    87

    文章

    11345

    瀏覽量

    210386
  • 編程
    +關注

    關注

    88

    文章

    3637

    瀏覽量

    93981
  • 開源
    +關注

    關注

    3

    文章

    3402

    瀏覽量

    42711
  • rabbitmq
    +關注

    關注

    0

    文章

    18

    瀏覽量

    1042

原文標題:RabbitMq 入門教程看這一篇就夠了!

文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。

收藏 人收藏

    評論

    相關推薦

    單片機入門教程

    單片機入門教程
    發表于 03-21 20:27 ?425次下載

    studio使用入門教程

    studio使用入門教程
    發表于 01-09 10:44 ?0次下載

    硬件工程師入門教程

    硬件工程師入門教程硬件工程師入門教程硬件工程師入門教程硬件工程師入門教程硬件工程師入門教程
    發表于 01-05 15:53 ?258次下載

    C語言入門教程

    很好的C語言入門教程,可以肯定的說這個教程只是為初學或入門者準備的
    發表于 01-22 14:46 ?7次下載

    Java經典入門教程

    Java經典入門教程,PDF格式,經典教程。
    發表于 03-14 11:16 ?0次下載

    protel99se入門教程

    protel99se入門教程,單片機入門教程
    發表于 05-09 10:59 ?63次下載

    Verilog HDL 華為入門教程

    Verilog HDL 華為入門教程
    發表于 06-03 16:57 ?45次下載

    51單片機c51語言入門教程C語言入門教程

    51單片機c51語言入門教程,C語言入門教程
    發表于 08-29 15:02 ?32次下載

    AD6.0初級入門教程

    AD6.0初級入門教程
    發表于 12-09 16:25 ?0次下載

    伺服系統入門教程

    伺服電機入門教程
    發表于 01-14 02:25 ?41次下載

    arduino入門教程 非常適合入門

    arduino入門教程 非常適合入門
    發表于 09-21 09:20 ?67次下載
    arduino<b class='flag-5'>入門教程</b> 非常適合<b class='flag-5'>入門</b>

    新手Android編程入門教程

    新手Android編程入門教程
    發表于 10-24 08:58 ?9次下載
    新手Android編程<b class='flag-5'>入門教程</b>

    Python經典入門教程

    Python的經典入門教程資料分享。
    發表于 06-01 10:25 ?117次下載

    硬件入門教程

    硬件入門教程
    發表于 07-04 14:49 ?77次下載

    PADS詳細入門教程

    PADS 詳細入門教程
    發表于 09-28 09:59 ?105次下載
    南京百家乐赌博现场被抓| 大发888娱乐场18| 百家乐官网对子计算方法| 24山安葬择日吉凶| 顶级赌场是骗人的吗| 三公百家乐官网在线哪里可以玩| V博百家乐的玩法技巧和规则 | 娱乐城在线| 华硕百家乐官网的玩法技巧和规则| 大发888娱乐场c17| 新梦想百家乐官网的玩法技巧和规则 | 百家乐转盘技巧| 赌博游戏机破解方法| 百家乐玩法百科| www.18lk.com| 百家乐官网庄家优势| 破解百家乐游戏机| 百家乐官网大眼仔用法| 百家乐娱乐场开户注册| 半圆百家乐官网桌子| 百家乐赌博娱乐| 百家乐官网视频游戏会员| 威尼斯人娱乐城赌博网站| 赌场百家乐官网是如何| bet365手机| 美国百家乐怎么玩| 页游| 百家乐小九梭哈| 澳门百家乐官网网上| 米其林百家乐的玩法技巧和规则| 赌场百家乐官网赌场| 大发888游戏平台电子| 百家乐官网77scs官网| 江城足球网| 欢乐博百家乐娱乐城| 百家乐官网娱乐城棋牌| 机械手百家乐的玩法技巧和规则 | 百家乐网盛世三国| 百家乐官网科学| 施秉县| 百家乐庄闲庄庄闲|