#作者:闫乾苓
系列前几篇:
《RabbitMQ 从入门到精通:从工作模式到集群部署实战(一)》:link
《RabbitMQ 从入门到精通:从工作模式到集群部署实战(二)》: link
《RabbitMQ 从入门到精通:从工作模式到集群部署实战(三)》:link
文章目录
- 高可用测试
- classic队列测试
- 工作原理
- 仲裁队列测试
- 工作原理
高可用测试
测试集群是使用RabbitMQ Cluster Kubernetes Operator 部署的3节点集群
对常用的classic、quorum两种类型的队列进行测试,使用2个python脚本作为消息的Producer和Consumer客户端连接到服务器集群,停止集群中1个节点,不影响消息的生产和消费。
classic队列测试
工作原理
RabbitMQ的经典队列(Classic Queue)镜像复制是一种高可用性特性,它通过在集群中的多个节点上复制队列的内容来提供数据冗余,从而确保在节点故障时队列及其消息仍然可以从其他节点恢复,提高了系统的可靠性和容错能力。以下是RabbitMQ经典队列镜像复制的工作原理:
镜像队列的组成
- 主节点(Leader):
负责处理所有的写入操作和部分读取操作。
将所有写入的消息同步给跟随者节点。 - 跟随者节点(Followers):
复制主节点的数据,以提供冗余和容错能力。
在主节点失效时,可以参与新的领导者选举,选出一个新的领导者继续提供服务。
镜像复制过程
- 消息写入:
- 当生产者向镜像队列发送消息时,消息首先被写入主节点上的队列。
- 主节点随后将消息同步到跟随者节点。这个过程通常是异步的,以保证性能,但可以通过配置来调整同步策略。
- 如果配置为要求确认(acknowledgment)模式,则主节点只有在收到所有跟随者的确认后才会认为消息已被成功存储。
- 消息读取:
- 消费者可以从任何节点上的镜像队列中消费消息。
- 默认情况下,消费者连接到哪个节点就从那个节点消费。
- 在主节点失效的情况下,消费者会被重定向到新的领导者节点。
镜像队列的类型
- 单活镜像队列:
在单活镜像队列中,只有一个节点处于活动状态,其他节点处于备份状态。
活动节点负责处理消息的生产和消费,而备份节点则负责复制活动节点的数据。 - 多活镜像队列:
在多活镜像队列中,所有节点都处于活动状态,都可以处理消息的生产和消费。
这种方式可以提高系统的吞吐量和可用性。
镜像队列的故障切换与恢复
- 故障切换:
当主节点失效时,剩余的跟随者节点之间会进行新的领导者选举,选出一个新的领导者继续提供服务。
镜像队列可以自动切换到另一个副本,确保服务的连续性。 - 恢复:
在主节点恢复后,它可以重新加入集群并成为跟随者节点。
跟随者节点会与新的领导者节点进行同步,以确保数据的一致性。
镜像队列的配置与管理
- 配置镜像策略:
使用策略(Policy)来配置镜像策略。
策略使用正则表达式来配置需要应用镜像策略的队列名称,以及在参数中配置镜像队列的具体参数。 - 管理UI:
RabbitMQ提供了管理UI,可以通过该UI来查看和管理镜像队列的状态、配置和性能。
测试脚本
脚本需要安装pika
pip3 install pika
Producer写入classic队列脚本
Exchange的Routing key和队列名一样
import pika
import time
import random
import string
# RabbitMQ连接参数
rabbitmq_host = '192.168.123.242' # 替换为你的RabbitMQ服务器地址
rabbitmq_port = 16027 # RabbitMQ端口
rabbitmq_vhost = '/' # 虚拟主机
rabbitmq_username = 'admin' # RabbitMQ用户名
rabbitmq_password = 'Admin.123' # RabbitMQ密码
rabbitmq_queue = 'queue_classic_02' # 目标队列名
rabbitmq_exchange = 'exchange-02' # 自定义exchange名
num_messages = 1000000 # 要发送的消息数量
message_size_kb = 1 # 每条消息的大小(KB)
message_size_bytes = message_size_kb * 1024 # 每条消息的大小(字节)
rate_limit_msgs_per_sec = 500 # 每秒发送的消息数量(速率限制)
# 计算发送每条消息后的暂停时间(秒)
sleep_time_between_messages = 1 / rate_limit_msgs_per_sec
# 建立连接和通道
credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
virtual_host=rabbitmq_vhost,
credentials=credentials
))
channel = connection.channel()
# 声明自定义的exchange(如果需要持久化,设置durable=True)
channel.exchange_declare(exchange=rabbitmq_exchange, exchange_type='direct', durable=True)
# 确保队列存在(如果需要持久化,也设置durable=True)
channel.queue_declare(queue=rabbitmq_queue, durable=True)
# 生成随机字符串的函数,长度为指定的字节数
def generate_random_message(size_bytes):
letters = string.ascii_letters + string.digits # 字母和数字
return ''.join(random.choice(letters) for _ in range((size_bytes * 8) // len(letters))).encode('utf-8')[:size_bytes]
# 批量发送消息,并增加速率控制
for i in range(num_messages):
message = generate_random_message(message_size_bytes)
routing_key = rabbitmq_queue # 使用队列名作为routing key(对于direct类型的exchange)
# 设置消息的properties,包括delivery_mode=2(持久化消息)
properties = pika.BasicProperties(delivery_mode=2) # 设置消息的持久化
channel.basic_publish(
exchange=rabbitmq_exchange,
routing_key=routing_key,
body=message,
properties=properties # 将properties传递给basic_publish方法
)
print(f"Sent message {i+1}")
# 根据速率限制暂停一段时间
time.sleep(sleep_time_between_messages)
# 关闭连接
connection.close()
Consumer消费classic队列脚本
import pika
import time
# RabbitMQ 连接参数
rabbitmq_host = '192.168.123.242'
rabbitmq_port = 16027 # 默认RabbitMQ端口,如果使用了非默认端口,请修改
rabbitmq_user = 'admin' # 默认RabbitMQ用户,如果使用了其他用户,请修改
rabbitmq_password = 'Admin.123' # 默认RabbitMQ密码,如果使用了其他密码,请修改
queue_name = 'queue_classic_02' # 您的队列名
rate_limit = 500 # 每秒处理消息的最大数量(这里作为示例,您可以根据需要调整)
# 创建连接参数
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
parameters = pika.ConnectionParameters(host=rabbitmq_host, port=rabbitmq_port, credentials=credentials)
# 连接到 RabbitMQ 服务器
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 确保队列存在(这一步是可选的,因为如果在队列不存在时尝试消费,RabbitMQ会自动创建它,但声明队列可以确保它具有预期的属性)
channel.queue_declare(queue=queue_name, durable=True)
# 消费消息的回调函数
def callback(ch, method, properties, body):
# 打印接收到的消息
print(f" [x] Received message: {body.decode('utf-8')}")
# 根据速率限制调整睡眠时间
# 注意:这里的速率限制是基于消息的处理时间,而不是消息的接收时间
# 如果处理时间很短,但实际希望减慢消费速度,可以保留这个sleep
# 如果处理时间已经足够长,或者希望尽可能快地处理消息,可以移除这个sleep
time.sleep(1 / rate_limit)
# 手动确认消息(如果队列配置为手动消息确认)
# 注意:这一步是在启用了QoS(basic_qos)之后才有意义的
ch.basic_ack(delivery_tag=method.delivery_tag)
# 设置QoS(Quality of Service),限制未确认消息的数量
channel.basic_qos(prefetch_count=1)
# 监听队列,并将回调函数设置为callback
channel.basic_consume(queue=queue_name, on_message_callback=callback)
print(f" [*] Waiting for messages in {queue_name}. To exit press CTRL+C")
try:
# 开始消费消息,这是一个阻塞调用
channel.start_consuming()
except KeyboardInterrupt:
print(" [*] Interrupted by user. Shutting down.")
finally:
# 确保在退出时关闭连接
connection.close()
消息数据丢失测试
ha-mode: exactly镜像复制策略测试:
创建queue
rabbitmqadmin declare queue name=queue_classic_01 durable=true
创建exchange及bingding
rabbitmqadmin declare exchange name=exchange-01 type=direct durable=true
rabbitmqadmin declare binding source=exchange-01 destination=queue_classic_01 routing_key=queue_classic_01
设置镜像复制策略
rabbitmqadmin declare policy name=ha-three-replicas pattern="^queue_classic_01$" definition='{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}' apply-to=queues
使用脚本写如10000条数据(需要开启消息内容持久化参数,delivery_mode=2)
手动删除1个pod,模拟集群节点宕机
Pod重建后,队列中的消息仍存在
继续进行删除2个pod测试
从结果看,删除2个pod的情况下,数据丢失了。
ha-mode: all镜像复制策略测试:
创建queue
rabbitmqadmin declare queue name=queue_classic_02 durable=true
创建exchange及bingding
rabbitmqadmin declare exchange name=exchange-02 type=direct durable=true
rabbitmqadmin declare binding source=exchange-02 destination=queue_classic_02 routing_key=queue_classic_02
设置镜像复制策略(允许个节点宕机)
rabbitmqadmin declare policy name=ha-three-replicas2 pattern="^queue_classic_02$" definition='{"ha-mode":"all","ha-sync-mode":"automatic"}' apply-to=queues
使用脚本写如10000条数据(需要开启消息内容持久化参数,delivery_mode=2)
删除2个pod,模拟2个节点宕机
Pod 重新后,数据仍然在
镜像策略对队列数据冗余的影响
镜像复制策略ha-mode: exactly
测试一是使用以下策略:
rabbitmqadmin declare policy name=ha-three-replicas pattern="^queue_classic_01$" definition='{"ha-mode":"exactly","ha-params":3,"ha-sync-mode":"automatic"}' apply-to=queues
- ha-mode: exactly:这表示队列的镜像将精确复制到指定数量的节点上。
- ha-params: 3:这指定了镜像的数量为3。
- ha-sync-mode: automatic:这表示队列的镜像将自动同步到指定的节点上
数据丢失原因分析:
如果集群中只有3个节点:
- 由于设置了ha-params: 3,这意味着队列queue_classic_01的镜像需要被复制到3个节点上。
- 在一个只有3个节点的集群中,如果2个节点宕机,那么只剩下1个节点存活。
- 由于没有足够的存活节点来存储3个镜像,因此这个队列的镜像将无法完全复制,并且会导致数据丢失(或者更准确地说,是数据将不再具有冗余性,因为只有一个存活节点上有数据)。
如果集群中有超过3个节点:
- 在这种情况下,即使2个节点宕机,仍然有足够的存活节点来存储3个镜像。
- 因此,数据不会丢失,因为至少有一个存活节点上保存有队列的完整数据,并且其他存活节点上也可能有该数据的镜像。
镜像复制策略ha-mode: all
ha-mode: all:这表示队列的镜像将被复制到集群中的所有节点。
ha-sync-mode: automatic:这表示队列的镜像将自动同步到所有节点上。
现在,我们来分析在3节点的RabbitMQ集群中,如果2个节点宕机,数据的情况:
队列镜像的复制:
由于 ha-mode 被设置为 “all”,队列 queue_classic_01 的镜像将被复制到集群中的所有3个节点上。
节点宕机的影响:
- 在一个3节点的集群中,如果2个节点宕机,那么只剩下1个节点存活。
- 但是,由于队列的镜像已经被复制到所有3个节点,因此即使2个节点宕机,剩下的1个节点上仍然保存有队列的完整数据。
数据的丢失:
- 在这种情况下,数据不会丢失,因为至少有一个存活节点上保存有队列的完整数据。
- 然而,队列的可用性可能会受到影响,因为RabbitMQ通常需要在多个节点上维护队列的镜像以确保高可用性。在只有一个存活节点的情况下,如果该节点也宕机,那么数据将无法访问,直到至少一个其他节点恢复并重新加入集群。
综上所述,在3节点的RabbitMQ集群中,如果您设置了 ha-mode: all 并且2个节点宕机,数据不会丢失,因为至少有一个存活节点上保存有队列的完整数据。
但是,请注意,这种配置下的队列可用性可能会受到影响,特别是在只有一个存活节点的情况下。为了保持高可用性,建议确保集群中有足够数量的健康节点,或者考虑使用其他容错和恢复策略。
pod节点宕机数量测试
同时运行2个python脚本连接Rabbitmq服务的k8 集群Node IP和NodePort,分别用于生产和消费消息,观察使用ha-mode:all 参数时,服务和数据的可用性。
通过web UI管理界面可以看到连接的是哪个Rabbitmq的pod节点
删除1个pod, 只要是生产和消费消息客户端正在连接的pod,生产和消费均不受影响。
Web管理界面可以看到节点故障及恢复的过程
同时删除2个pod节点,只有在删除的pod节点不是队列的leader,并且是生产或者消费客户端的连接的pod节点不是leader,同时删除2个pod节点, 生产和消费服务也不会中断。
如果删除的pod节点是leader节点,或者生产或者消费客户端连接的pod节点,相应的连接会中断,但重新连接后,不影响生产和消费数据。
仲裁队列测试
工作原理
RabbitMQ的Quorum Queue(仲裁队列)是其提供的一种高可用队列实现,旨在解决镜像队列的性能和同步问题。以下是RabbitMQ仲裁队列的工作原理:
基本概念
仲裁队列:RabbitMQ从3.8.0版本开始引入仲裁队列功能,作为镜像队列的替代方案。仲裁队列具有队列复制的能力,可以保障数据的高可用和安全性。
Raft算法:仲裁队列使用Raft算法实现了持久的、复制的FIFO(先进先出)队列。Raft算法是一种用于管理复制日志的共识算法,它提供了数据一致性和容错性。
工作原理
- 队列复制:
仲裁队列会在RabbitMQ节点间进行队列数据的复制。
当一个节点宕机时,由于数据已经复制到其他节点,因此队列仍然可以提供服务。 - 消息写入:
在仲裁队列中,消息要有集群中多半节点同意后,才会被写入队列。
这种写入机制保证了消息在集群内部不会丢失。 - 消息读取:
消费者可以从仲裁队列中读取消息,并进行相应的处理。
读取过程不会影响队列的复制和写入操作。 - 容错与安全性:
仲裁队列通过复制和Raft算法保证了数据的高可用性和安全性。
即使部分节点宕机,只要剩余节点数量超过半数,仲裁队列仍然可以正常工作。 - 性能优化:
仲裁队列使用了Raft协议,相比镜像队列的算法更有效率,可以提供更好的消息吞吐量。
同时,仲裁队列也支持节点的滚动升级,提高了系统的可维护性。
使用场景与限制 - 使用场景:
仲裁队列适用于长期存在的队列,以及对容错和数据安全方面要求较高的场景。
它不适合用于临时使用的队列,如transient临时队列、exclusive独占队列,或者经常会修改和删除的队列。 - 限制:
仲裁队列当前会将所有消息始终保存在内存中,直到达到内存使用极限。因此,在内存使用量较高时,可能会导致集群不可用。
仲裁队列的磁盘和内存配置与普通队列不同,写入放大可能导致更大的磁盘使用。因此,在使用仲裁队列时,需要进行合理的规划和监控。
综上所述,RabbitMQ的仲裁队列通过复制和Raft算法实现了数据的高可用性和安全性。然而,在使用时也需要注意其限制和性能特点,以确保系统的稳定运行。
测试脚本
Producer
Exchange的Routing key和队列名一样
import pika
import time
import random
import string
# RabbitMQ连接参数
rabbitmq_host = '192.168.123.242' # 替换为你的RabbitMQ服务器地址
rabbitmq_port = 16027 # RabbitMQ端口(注意:这通常不是默认端口,确保这是正确的)
rabbitmq_vhost = '/' # 虚拟主机
rabbitmq_username = 'admin' # RabbitMQ用户名
rabbitmq_password = 'Admin.123' # RabbitMQ密码
rabbitmq_queue = 'queue_quorum_01' # 目标队列名(注意:这里应该是队列名,而不是“quorum队列名”,除非你有特别的定义)
rabbitmq_exchange = 'exchange_quorum_01' # 自定义exchange名
num_messages = 10000 # 要发送的消息数量
message_size_kb = 1 # 每条消息的大小(KB)
message_size_bytes = message_size_kb * 1024 # 每条消息的大小(字节)
# 建立连接和通道
credentials = pika.PlainCredentials(rabbitmq_username, rabbitmq_password)
connection = pika.BlockingConnection(pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
virtual_host=rabbitmq_vhost,
credentials=credentials
))
channel = connection.channel()
# 声明自定义的exchange(如果需要持久化,设置durable=True)
channel.exchange_declare(exchange=rabbitmq_exchange, exchange_type='direct', durable=True)
# 确保队列存在(对于RabbitMQ的普通队列,你需要声明它;但对于Quorum队列,这通常是由RabbitMQ集群管理的)
# 注意:如果你的队列是Quorum队列,并且已经由RabbitMQ集群正确配置,那么你可能不需要再次声明它。
# 但是,如果你想让这个队列也接收来自新exchange的消息,你可能需要重新绑定它(这取决于你的配置)。
# 这里我们假设队列已经存在并且配置正确。
# 生成随机字符串的函数,长度为指定的字节数
def generate_random_message(size_bytes):
letters = string.ascii_letters + string.digits # 字母和数字
# 注意:由于我们后面进行了切片操作[:size_bytes],所以最终长度会是正确的
# 但是,这种方法可能不是最高效的,因为它首先生成了一个更大的字符串,然后又切掉了多余的部分。
# 一个更优化的方法是直接生成指定大小的字节数组。
# 不过,为了保持与原始脚本的一致性,我们在这里还是使用原始方法。
return ''.join(random.choice(letters) for _ in range((size_bytes * 8) // len(letters))).encode('utf-8')[:size_bytes]
# 批量发送消息
for i in range(num_messages):
message = generate_random_message(message_size_bytes)
routing_key = rabbitmq_queue # 使用队列名作为routing key(对于direct类型的exchange)
# 设置消息的properties,包括delivery_mode=2(持久化消息)
properties = pika.BasicProperties(delivery_mode=2) # 设置消息的持久化
channel.basic_publish(
exchange=rabbitmq_exchange,
routing_key=routing_key,
body=message,
properties=properties # 将properties传递给basic_publish方法
)
print(f"Sent message {i+1}")
# 关闭连接
connection.close()
consumer
import pika
import time
# 连接参数
rabbitmq_host = '192.168.123.242' # 替换为你的RabbitMQ服务器IP
rabbitmq_port = 16027 # 替换为你的RabbitMQ服务器端口,如果使用了默认端口则无需修改
rabbitmq_user = 'admin' # 替换为你的RabbitMQ账号
rabbitmq_password = 'Admin.123' # 替换为你的RabbitMQ密码
# 队列名(已经存在的Quorum队列)
queue_name = 'queue_quorum_01'
# 连接到RabbitMQ服务器
credentials = pika.PlainCredentials(rabbitmq_user, rabbitmq_password)
parameters = pika.ConnectionParameters(
host=rabbitmq_host,
port=rabbitmq_port,
credentials=credentials
)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
# 定义一个回调函数来处理从队列中接收到的消息
def callback(ch, method, properties, body):
# 假设消息是字符串(不是JSON),因为生产者发送的是字符串
message = body.decode('utf-8') # 解码为字符串
print(f"Received message: {message}")
# 模拟消息处理时间
time.sleep(0.01) # 假设处理每条消息需要1秒
# 在这里处理消息
# 告诉RabbitMQ使用callback函数来处理从指定队列中接收到的消息
# 设置QoS(Quality of Service),每次只处理一条消息,直到手动ack
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=False)
# 告诉RabbitMQ我们完成了消息的设置,现在我们可以手动ack消息了
def manual_ack(delivery_tag):
channel.basic_ack(delivery_tag=delivery_tag)
# 包装callback函数以支持手动ack
def wrapped_callback(ch, method, properties, body):
callback(ch, method, properties, body)
manual_ack(method.delivery_tag)
# 替换原有的callback为支持QoS的callback
channel.basic_consume(queue=queue_name, on_message_callback=wrapped_callback)
print(f'Waiting for messages in {queue_name}. To exit press CTRL+C')
channel.start_consuming()
高可用测试
创建queue
rabbitmqadmin declare queue name=queue_quorum_01 durable=true arguments='{"x-queue-type": "quorum"}'
创建exchange及bingding
rabbitmqadmin declare exchange name=exchange_quorum_01 type=direct durable=true
rabbitmqadmin declare binding source=exchange_quorum_01 destination=queue_quorum_01 routing_key=queue_quorum_01
手动删除1个pod节点,模拟3节点集群中1个节点宕机的故障,只要客户端连接的不是被停止的pod节点,客户端生产和消费都是正常的。通过web管理界面看,队列的状态是:running
此时如果继续删除第2个pod节点,模拟3节点集群中2个节点宕机的故障,在k8s中使用operator部署的RabbitMQ集群,在手动执行删除第2个pod是,命令将被挂起(无反应)直到operator通过内容部控制机制将第1个删除的pod重启成功,才会继续执行第2个pod的删除操作。此时Rabbimq服务是正常状态(如果客户端连接的是被删除pod节点,连接会被断开,重连后连接被svc 负载到其他pod节点,可以正常读写数据)。这应该是RabbitMQ operator控制的效果,在3节点的集群中,确保同时只能1个pod节点宕机,服务不受影响。
另外在裸金属部署的3节点RabbitMQ集群中进行了类似测试,使用“rabbitmq stop_app”同时停止2个节点(非读写客户端正在连接的节点),此时RabbitMQ处于“minority”(少数)状态,这正是quorum队列需要超过半数节点正常才能正常工作的工作机制。
此时,写客户端(producer)的连接状态虽然为‘running’但实际测试是没有数据写入到服务器,读客户端(consumer)的连接状态为“flow”,也不能从服务器获取数据。