RabbitMQ 簡介
RabbitMQ是一個開源的,在AMQP基礎上完整的,可復用的企業消息系統。
支持主流的操作系統,Linux、Windows、MacOX等
多種開發語言支持,Java、Python、Ruby、.NET、PHP、C/C++、node.js等
AMQP,即 Advanced Message Queuing Protocol(高級消息隊列協議),是一個網絡協議,是應用層協議的一個開放標準,為面向消息的中間件設計。基于此協議的客戶端與消息中間件可傳遞消息,并不受客戶端/中間件不同產品,不同的開發語言等條件的限制。2006年,AMQP 規范發布。
2007年,Rabbit 技術公司基于 AMQP 標準開發的 RabbitMQ 1.0 發布。RabbitMQ 采用 Erlang 語言開發。Erlang 語言由 Ericson 設計,專門為開發高并發和分布式系統的一種語言,在電信領域使用廣泛。
RabbitMQ基本概念
RabbitMQ 基礎架構:
Broker
接收和分發消息的應用,RabbitMQ Server就是 Message Broker
Virtual host
虛擬主機,出于多租戶和安全因素設計的,把 AMQP 的基本組件劃分到一個虛擬的分組中,類似于網絡中的 namespace 概念。當多個不同的用戶使用同一個 RabbitMQ server 提供的服務時,可以劃分出多個vhost,每個用戶在自己的 vhost 創建 exchange/queue 等,每一個虛擬主機都有AMQP的全套基礎組件,并且可以針對每個虛擬主機進行權限以及數據分配,并且不同虛擬主機之間是完全隔離的。
Connection
客戶端與RabbitMQ進行交互,首先就需要建立一個TPC連接。RabbitMQ為了減少性能開銷,也會在一個Connection中建立多個Channel,這樣便于客戶端進行多線程連接,這些連接會復用同一個Connection的TCP通道,提高性能。
Channel
客戶端與RabbitMQ建立了連接,就會分配一個AMQP信道 Channel。每個信道都會被分配一個唯一的ID。
Exchange
消息隊列交換機,消息發送到RabbitMQ中后,會首先進入一個交換機,然后由交換機負責將數據轉發到不同的隊列中。RabbitMQ中有多種不同類型的交換機來支持不同的路由策略。
交換機多用來與生產者打交道。生產者發送的消息通過Exchange交換機分配到各個不同的Queue隊列上,而對于消息消費者來說,通常只需要關注自己的隊列就可以了。
Queue
消息隊列,隊列是實際保存數據的最小單位。隊列結構天生就具有FIFO的順序。
Producer
消息生產者,即生產方客戶端,生產方客戶端將消息發送
Consumer
消息消費者,即消費方客戶端,接收MQ轉發的消息。
消息發送者的固定步驟
1.創建消息生產者producer,并制定生產者組名
2.指定Nameserver地址
3.啟動producer
4.創建消息對象,指定主題Topic、Tag和消息體
5.發送消息
6.關閉生產者producer
消息消費者的固定步驟
1.創建消費者Consumer,制定消費者組名
2.指定Nameserver地址
3.訂閱主題Topic和Tag
4.設置回調函數,處理消息
5.啟動消費者consumer
編程模型
引入依賴
com.rabbitmq amqp-client 5.9.0
創建連接獲取Channel
ConnectionFactory factory = new ConnectionFactory() factory.setHost(HOST_NAME) factory.setPort(HOST_PORT) factory.setUsername(USER_NAME) factory.setPassword(PASSWORD) factory.setVirtualHost(VIRTUAL_HOST) Connection connection = factory.newConnection() Channel channel = connection.createChannel()
聲明Exchange(可選)
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Maparguments) throws IOException;
Exchange有四種類型: fanout、 topic 、headers 、direct
聲明queue
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Maparguments);
durable 表示是否持久化。Durable選項表示會將隊列的消息寫入硬盤,這樣服務重啟后這些消息就不會丟失。
聲明Exchange與Queue的綁定關系(可選)
channel.queueBind(String queue, String exchange, String routingKey) throws IOException;
聲明了Exchange和Queue,那么就還需要聲明Exchange與Queue的綁定關系Binding。有了這些Binding,Exchange才可以知道Producer發送過來的消息將要分發到哪些Queue上。
這些Binding涉及到消息的不同分發邏輯,與Exchange和Queue一樣,如果Broker上沒有建立綁定關系,那么RabbitMQ會按照客戶端的聲明,創建這些綁定關系。
發送消息
channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;
其中Exchange如果不需要,傳個空字符串。
props的這些配置項,可以用RabbitMQ中提供的一個Builder對象來構建。
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder() //對應頁面上的Properties部分,傳入一些預定的參數值。 builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode()) builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority()) //builder.headers(headers) builder.build() AMQP.BasicProperties prop = builder.build()
MessageProperties.PERSISTENT_TEXT_PLAIN是RabbitMQ提供的持久化消息的默認配置。
消費消息
被動消費模式
Consumer等待rabbitMQ 服務器將message推送過來再消費。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
主動消費模式
Comsumer主動到rabbitMQ服務器上去拉取messge進行消費。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)
消費消息確認
自動ACK:autoAck為true,消息一旦被接收,消費者自動發送ACK,如果消費失敗了,后續也無法再消費了
手動ACK:autoAck為false,消息接收后,不會發送ACK,需要手動調用 channel.basicAck 來通知服務器已經消費了該message.這樣即使Consumer在執行message過程中出問題了,也不會造成消息丟失。
釋放資源
channel.close() conection.clouse()
消息模型
簡單模式
最直接的方式,P端發送一個消息到一個指定的queue,中間不需要任何exchange規則。C端按queue方式進行消費。
在上圖的模型中,有以下概念:
P:生產者,也就是要發送消息的程序
C:消費者:消息的接受者。
queue:消息隊列,圖中紅色部分。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
一個生產者、一個消費者,不需要設置交換機(使用默認的交換機)。
producer:
channel.queueDeclare(QUEUE_NAME,false,false,false,null); channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
consumer:
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Work queues 工作隊列模式
Work Queues:與簡單模式相比,多了一個或一些消費端,多個消費端共同消費同一個隊列中的消息。一個消息只會被一個消費者消費。
一個生產者、多個消費者(競爭關系),不需要設置交換機(使用默認的交換機)。
應用場景:對于任務過重或任務較多情況使用工作隊列可以提高任務處理的速度。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
Consumer: 每次拉取一條消息。
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); channel.basicQos(1); channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
Publish/Subscribe 發布訂閱
exchange type是 fanout。
在訂閱模型中,多了一個 Exchange 角色,而且過程略有變化:
P:生產者,也就是要發送消息的程序,但是不再發送到隊列中,而是發給X(交換機)
C:消費者,消息的接收者
Queue:消息隊列,接收消息、緩存消息Exchange:交換機(X)。一方面,接收生產者發送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
Exchange有常見以下3種類型:
Fanout:廣播,將消息交給所有綁定到交換機的隊列,交換機需要與隊列進行綁定,綁定之后;一個消息可以被多個消費者都收到。
Direct:定向,把消息交給符合指定routing key 的隊列
Topic:通配符,把消息交給符合routing pattern(路由模式) 的隊列
Exchange(交換機)只負責轉發消息,不具備存儲消息的能力,因此如果沒有任何隊列與 Exchange 綁定,或者沒有符合路由規則的隊列,那么消息會丟失!producer只負責發送消息,至于消息進入哪個queue,由exchange來分配。
使用場景:
所有消費者獲得相同的消息,例如天氣預報。
生產者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
消費者:
channel.exchangeDeclare(EXCHANGE_NAME, "fanout") String queueName = channel.queueDeclare().getQueue() channel.queueBind(queueName, EXCHANGE_NAME, "")
發布訂閱模式與工作隊列模式的區別:
工作隊列模式不用定義交換機,而發布/訂閱模式需要定義交換機
發布/訂閱模式的生產方是面向交換機發送消息,工作隊列模式的生產方是面向隊列發送消息(底層使用默認交換機)
發布/訂閱模式需要設置隊列和交換機的綁定,工作隊列模式不需要設置,實際上工作隊列模式會將隊列綁 定到默認的交換機
Rout 路由模式
exchange typ 是 direct。
P:生產者,向 Exchange 發送消息,發送消息時,會指定一個routing key
X:Exchange(交換機),接收生產者的消息,然后把消息遞交給與 routing key 完全匹配的隊列
C1:消費者,其所在隊列指定了需要 routing key 為 error 的消息
C2:消費者,其所在隊列指定了需要 routing key 為 info、error、warning 的消息
路由模式要求隊列在綁定交換機時要指定 routing key,消息會轉發到符合 routing key 的隊列。
生產者:
channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
消費者:
channel.exchangeDeclare(EXCHANGE_NAME, "direct") channel.queueBind(queueName, EXCHANGE_NAME, routingKey1) channel.queueBind(queueName, EXCHANGE_NAME, routingKey2) channel.basicConsume(queueName, true, consumer)
Topics 通配符模式
exchange type 是 topic
紅色 Queue:綁定的是 usa.# ,因此凡是以 usa. 開頭的 routing key 都會被匹配到
黃色 Queue:綁定的是 #.news ,因此凡是以 .news 結尾的 routing key 都會被匹配
對routingKey進行了模糊匹配單詞之間用,隔開,* 代表一個具體的單詞。# 代表0個或多個單詞
Topic 主題模式可以實現 Pub/Sub 發布與訂閱模式和 Routing 路由模式的功能,只是 Topic 在配置routing key 的時候可以使用通配符,顯得更加靈活。
Producer:
channel.exchangeDeclare(EXCHANGE_NAME, "topic"); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes("UTF-8"));
Receiver:
channel.exchangeDeclare(EXCHANGE_NAME, "topic") channel.queueBind(queueName, EXCHANGE_NAME, routingKey1) channel.queueBind(queueName, EXCHANGE_NAME, routingKey2) channel.basicConsume(queueName, true, consumer)
發送消息確認
發送的消息如果沒有被消費者及時消費有可能會導致消息丟失。
發送者確認模式默認是不開啟的,所以如果需要開啟發送者確認模式,需要手動在channel中進行聲明。
channel.confirmSelect()
使用異步確認消息保證消息在生產端不丟失。
Producer在channel中注冊監聽器來對消息進行確認。核心代碼:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
ConfirmCallback,監聽器接口,里面只有一個方法:
void handle(long sequenceNumber, boolean multiple) throws IOException;
這方法中的兩個參數,
sequenceNumer:這個是一個唯一的序列號,代表一個唯一的消息。在RabbitMQ中,他的消息體只是一個二進制數組,默認消息是沒有序列號的。那么在回調的時候,Producer怎么知道是哪一條消息成功或者失敗呢?RabbitMQ提供了一個方法int sequenceNumber = channel.getNextPublishSeqNo();來生成一個全局遞增的序列號,這個序列號將會分配給新發送的那一條消息。然后應用程序需要自己來將這個序列號與消息對應起來。沒錯!是的!需要客戶端自己去做對應!
multiple:這個是一個Boolean型的參數。如果是false,就表示這一次只確認了當前一條消息。如果是true,就表示RabbitMQ這一次確認了一批消息,在sequenceNumber之前的所有消息都已經確認完成了。
SpringBoot集成RabbitMQ
添加依賴
org.springframework.boot spring-boot-starter-amqp
配置文件
server: port: 8081 spring: application: name: test-rabbitmq-producer rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtualHost: /
rabbitMQ配置類
配置Exchange、Queue、及綁定交換機,下面配置Topic交換機。
package com.example.config; import org.springframework.amqp.core.*; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; * RabbitmqConfig */ @Configuration public class RabbitmqConfig { public static final String QUEUE_INFORM_EMAIL = "queue_inform_email"; public static final String QUEUE_INFORM_SMS = "queue_inform_sms"; public static final String EXCHANGE_TOPICS_INFORM="exchange_topics_inform"; public static final String ROUTINGKEY_EMAIL="inform.#.email.#"; public static final String ROUTINGKEY_SMS="inform.#.sms.#"; @Bean(EXCHANGE_TOPICS_INFORM) public Exchange EXCHANGE_TOPICS_INFORM(){ return ExchangeBuilder.topicExchange(EXCHANGE_TOPICS_INFORM).durable(true).build(); } @Bean(QUEUE_INFORM_EMAIL) public Queue QUEUE_INFORM_EMAIL(){ return new Queue(QUEUE_INFORM_EMAIL); } @Bean(QUEUE_INFORM_SMS) public Queue QUEUE_INFORM_SMS(){ return new Queue(QUEUE_INFORM_SMS); } @Bean public Binding BINDING_QUEUE_INFORM_EMAIL(@Qualifier(QUEUE_INFORM_EMAIL) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs(); } @Bean public Binding BINDING_ROUTINGKEY_SMS(@Qualifier(QUEUE_INFORM_SMS) Queue queue, @Qualifier(EXCHANGE_TOPICS_INFORM) Exchange exchange){ return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs(); } }
發送消息
String message = "hello world" rabbitTemplate.convertAndSend(RabbitmqTopicConfig.EXCHANGE_TOPICS_INFORM, "inform.email", message)
消費消息
消費者都是通過@RabbitListener注解來聲明。在@RabbitMQListener注解中包含了非常多對Queue進行定制的屬性,大部分的屬性都是有默認值的。
@RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_EMAIL}) public void receive_email(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_EMAIL msg"+msg); } @RabbitListener(queues = {RabbitmqTopicConfig.QUEUE_INFORM_SMS}) public void receive_sms(Object msg, Message message, Channel channel){ System.out.println("QUEUE_INFORM_SMS msg"+msg); }
-
Linux
+關注
關注
87文章
11345瀏覽量
210386 -
編程
+關注
關注
88文章
3637瀏覽量
93981 -
開源
+關注
關注
3文章
3402瀏覽量
42711 -
rabbitmq
+關注
關注
0文章
18瀏覽量
1042
原文標題:RabbitMq 入門教程看這一篇就夠了!
文章出處:【微信號:magedu-Linux,微信公眾號:馬哥Linux運維】歡迎添加關注!文章轉載請注明出處。
發布評論請先 登錄
相關推薦
評論