RabbitMQ介绍及部署

介绍

  • RabbitMQ是采用Erlang编程语言实现的高级消息队列协议AMQP(Advanced Message Queuing Protocol)的开源消息队列中间件
  • 作用
    • 应用解耦
    • 流量削峰
    • 异步处理
    • 缓存存储
    • 消息通信
    • 提高系统扩展性

特点

  1. 可靠性:通过持久化和传输确认等来确保消息传递的可靠性
  2. 扩展性:多个RabbitMQ节点可以组成集群
  3. 高可用:队列可以在RabbitMQ集群中设置镜像,如此一来即使部分节点挂掉,队列仍然可以使用
  4. 多种协议支持:原生支持AMQP,也能支持STOMP、MQTT等协议
  5. 丰富的客户端:常用的编程语言都有客户端
  6. 管理界面:自带一个webUI界面
  7. 插件机制:RabbitMQ自己提供了多种插件,可以按需扩展Plugins

基本概念

生产消费模型,用于实现消息的接收、存储、转发
示意图

  • Producer(生产者):消息的生产方,投递方。
  • Consumer(消费者):消息的消费者。
  • RabbitMQ Broker(RabbitMQ代理):RabbitMQ服务节点。在单机环境中,就是代表RabbitMQ服务器。
  • Queue(队列):在RabbitMQ中Queue是存储消息数据的唯一形式。
  • Binding(绑定):在RabbitMQ中的Binding是Exchange将message路由给Queue所需遵循的规则。如果要指定“交换机E将消息路由给队列Q”,那么Q就需要与E进行绑定。绑定操作需要定义一个可选的路由键(routing key)属性给某些类型的交换机。路由键的意义在于从发送给交换机的众多消息中选择出某些消息,将其路由给绑定的队列。
  • RoutingKey(路由键):消息投递给交换机,通常会指定一个RoutingKey,通过这个路由键来明确消息的路由规则。RoutingKey 通常是生产者和消费者有协商一致的key策略,消费者就可以合法从生产者手中获取数据。这个RoutingKey主要当Exchange交换机模式为设定为direct和topic模式的时候使用,fanout模式不使用RoutingKey。
  • Exchange(交换机):生产者将消息发送给交换机,再由交换机将消息路由到对应的队列中。交换机有四种类型:fanout、direct、topic、headers

Exchange

常用类型:fanout、direct、topic

fanout

  1. fanout:扇形交换机
  2. 其将发送带该类型交换机的消息路由到所有与该交换机绑定的队列中,如同一个扇形一样扩散给各个队列
  3. fanout类型的交换机会忽略RoutingKey的存在,将消息直接广播到绑定的所有队列中
    扇形示意

direct

  1. direct理解为直连交换机
  2. 根据消息携带的RoutingKey将消息投递到相应的队列中
  3. direct类型的交换机(exchange)是RabbitMQ Broker的默认类型,它有一个特别的属性对一些简单的应用来说是非常有用的,在使用这个类型的Exchange时,可以不必指定routing key的名字,在此类型下创建的Queue有一个默认的routing key,这个routing key一般同Queue同名
    直连示意

topic

  1. 把topic理解为主题交换机
  2. topic交换机在RoutingKey和BindKey匹配规则上更加灵活,同样是将消息路由到RoutingKey和BindKey相匹配的队列中,但是匹配规则有如下特点
  3. RoutingKey是一个使用.分割的字符串,例如:go.log.info、java.log.error
  4. BindKey也是一个使用.分割的字符串,但是在BindKey中可以使用两种特殊字符*和#用于匹配一个单词,#用于匹配多规格单词(零个或多个单词)
  5. RoutingKey和BindKey是一种“模糊匹配”,那么一个消息可能会被发送到一个或者多个队列中
  6. 无法匹配的消息将会被丢弃或者返回给生产者
    topic示意

工作流程

消费生产流程

  1. 消息生产者与RabbitMQ Broker建立一个连接,建立连接之后开启一个信道channel
  2. 声明一个交换机,并设置与其相关的属性(交换机类型、持久化等)
  3. 声明一个队列,并设置其相关属性(排他性、持久化、自动删除等)
  4. 通过路由键将交换机和队列绑定
  5. 消息生产者发送消息给RabbitMQ Broker,消息中包含了路由键、交换机等信息,交换机根据接收的路由键查找匹配的队列
  6. 查找匹配成功,将消息存储到队列中
  7. 查找匹配失败,根据生产者配置的属性选择丢弃或者退回给生产者
  8. 关闭信道channel,关闭连接

消息消费流程

  1. 消费者与RabbitMQ Broker建立连接,连接建立之后开启一个channel
  2. 消费者向RabbitMQ Broker请求消费者相应队列中的消息
  3. 等待RabbitMQ Broker回应并投递相应队列中的消息,消费者接收消息
  4. 消费者确认接收消息(ACK),RabbitMQ Broker删除已经确认的消息
  5. 关闭信道channel,关闭连接

部署流程

1
docker pull rabbitmq

pull示意

1
docker images

查看镜像示意

1
docker run

运行示意

1
docker ps

查看进程

1
docker run -id --hostname myrabbit --name rabbitmq1 -p 15672:15672 -p 5672:5672 rabbitmq(安装mq WebUI界面插件)

查看结果

  • 进入UI界面 ip+端口 默认账户名密码guest
    查看结果

  • 点击channel如果报错,重新进入mq镜像修改
    查看结果

  • 正常进入channel
    查看结果

Python使用MQ demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import threading
import time
import pika

"MQ demo"
class MqOperator(HeartBeat):

def __init__(self, username, pwd, host):
self.__user_pwd = pika.PlainCredentials(username, pwd)
self.__s_conn = pika.BlockingConnection(pika.ConnectionParameters(host,
heartbeat=60,
socket_timeout=5,
credentials=self.__user_pwd))
super(MqOperator, self).__init__(connection=self.__s_conn)

# self.host = host
# 创建链接
# self.__s_conn = pika.BlockingConnection(pika.ConnectionParameters(self.host,
# heartbeat=60,
# socket_timeout=5,
# credentials=self.__user_pwd))
self.__channel = self.__s_conn.channel()

# 生产者
def product_message(self, queue_name, routing_key, body, exchange=''):
try:

self.__channel.queue_declare(queue=queue_name) # 声明队列
self.__channel.basic_publish(exchange=exchange, # 交换机
routing_key=routing_key, # 路由键 写明将消息发往哪个队列,本例是将消息发往队列hello
body=body,
properties=pika.BasicProperties(delivery_mode=1)) # 生产者要发送的消息
print(f"【生产者】往【{queue_name}】队列发送了【{body}】")
except:
print("error")
finally:
# self.__s_conn.close() # 发送完消息关闭链接
pass

# 消费者
def consumer_message(self, queue_name):
try:
# self.__channel.queue_declare(queue=queue_name,durable=True) # 声明队列
self.__channel.basic_consume(on_message_callback=self.callback, queue=queue_name, auto_ack=False)
self.start()
self.startHeartBeat()
self.__channel.start_consuming()
except:
print("connect error")
finally:
# self.__s_conn.close() # 接收完消息关闭链接
pass

# 消费时回调函数
def callback(self, channel, method, properties, body):

# body = body.decode('urf-8')
# try:
# print(f"消费数据:{body}")
# except Exception as e:
# print(f"error:{e}")
print(f"消费者 recv:{body}")
import time
time.sleep(1)
print("ok")
channel.basic_ack(delivery_tag=method.delivery_tag)

MQ-WebUI界面

  • 新建队列

  • 点击队列进入队列内,

  • 发送消息

  • 消息发送成功,查看队列消息

  • 拿demo消费,查看消费内容

  • 以上一个简单的生产消费流程走完