Skip to content

RabbitMQ 面试题集

RabbitMQ 消息队列核心知识点与高频面试题

A. 面试宝典

基础题

1. RabbitMQ 核心概念

┌─────────────────────────────────────────────────────────┐
│                    RabbitMQ 架构                         │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  Producer                                                │
│     │                                                    │
│     ▼                                                    │
│  ┌──────────────────────────────────────────────┐       │
│  │               Exchange                        │       │
│  │    (Direct/Fanout/Topic/Headers)             │       │
│  └──────────────────────┬───────────────────────┘       │
│            ┌────────────┼────────────┐                  │
│            ▼            ▼            ▼                  │
│       ┌────────┐  ┌────────┐  ┌────────┐              │
│       │ Queue1 │  │ Queue2 │  │ Queue3 │              │
│       └───┬────┘  └───┬────┘  └───┬────┘              │
│           ▼           ▼           ▼                    │
│       Consumer1   Consumer2   Consumer3                │
│                                                          │
└─────────────────────────────────────────────────────────┘
概念说明
Producer消息生产者
Consumer消息消费者
Exchange交换机,路由消息
Queue队列,存储消息
Binding绑定,Exchange 和 Queue 的关联
Routing Key路由键,消息路由规则
Virtual Host虚拟主机,隔离环境

2. Exchange 类型

python
# Direct Exchange - 精确匹配 routing key
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
channel.basic_publish(exchange='direct_logs', routing_key='error', body=message)

# Fanout Exchange - 广播到所有绑定队列
channel.exchange_declare(exchange='logs', exchange_type='fanout')
channel.basic_publish(exchange='logs', routing_key='', body=message)

# Topic Exchange - 模式匹配 routing key
# * 匹配一个单词,# 匹配零或多个单词
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='*.error')
channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key='user.#')

# Headers Exchange - 根据消息头匹配
channel.exchange_declare(exchange='headers_logs', exchange_type='headers')
类型路由规则使用场景
Direct精确匹配 routing key点对点通信
Fanout广播到所有队列广播消息
Topic模式匹配 routing key多条件路由
Headers匹配消息头复杂路由规则

3. 消息确认机制

python
# 生产者确认 (Publisher Confirms)
channel.confirm_delivery()

try:
    channel.basic_publish(exchange='', routing_key='queue', body=message)
    print("Message published")
except pika.exceptions.UnroutableError:
    print("Message could not be routed")

# 消费者确认 (Consumer Acknowledgements)
def callback(ch, method, properties, body):
    try:
        process_message(body)
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 确认
    except Exception:
        ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)  # 拒绝并重新入队

channel.basic_consume(queue='queue', on_message_callback=callback, auto_ack=False)

# 消息持久化
# 1. 队列持久化
channel.queue_declare(queue='queue', durable=True)

# 2. 消息持久化
channel.basic_publish(
    exchange='',
    routing_key='queue',
    body=message,
    properties=pika.BasicProperties(delivery_mode=2)  # 持久化
)

4. 消息可靠性保证

┌─────────────────────────────────────────────────────────┐
│                    可靠性机制                            │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  生产端:                                                │
│  1. Publisher Confirms - 确认消息到达 Exchange          │
│  2. Return Listener - 消息无法路由时回调                 │
│                                                          │
│  Broker 端:                                            │
│  1. 队列持久化 (durable=True)                           │
│  2. 消息持久化 (delivery_mode=2)                        │
│  3. 镜像队列 (HA)                                       │
│                                                          │
│  消费端:                                                │
│  1. 手动 ACK (auto_ack=False)                          │
│  2. 幂等性处理                                          │
│  3. 死信队列                                            │
│                                                          │
└─────────────────────────────────────────────────────────┘

5. 死信队列 (DLX)

python
# 死信队列配置
# 消息成为死信的条件:
# 1. 消息被拒绝 (basic.reject/basic.nack) 且 requeue=false
# 2. 消息过期 (TTL)
# 3. 队列达到最大长度

# 声明死信交换机和队列
channel.exchange_declare(exchange='dlx_exchange', exchange_type='direct')
channel.queue_declare(queue='dlx_queue')
channel.queue_bind(exchange='dlx_exchange', queue='dlx_queue', routing_key='dlx')

# 声明业务队列,绑定死信交换机
args = {
    'x-dead-letter-exchange': 'dlx_exchange',
    'x-dead-letter-routing-key': 'dlx',
    'x-message-ttl': 60000  # 消息过期时间 60 秒
}
channel.queue_declare(queue='business_queue', arguments=args)

延迟队列实现:

python
# 利用 TTL + DLX 实现延迟队列
# 1. 消息发送到延迟队列(设置 TTL)
# 2. 消息过期后进入死信队列
# 3. 消费者消费死信队列

# 或使用 rabbitmq_delayed_message_exchange 插件
channel.exchange_declare(
    exchange='delayed_exchange',
    exchange_type='x-delayed-message',
    arguments={'x-delayed-type': 'direct'}
)

# 发送延迟消息
channel.basic_publish(
    exchange='delayed_exchange',
    routing_key='delay_key',
    body=message,
    properties=pika.BasicProperties(headers={'x-delay': 5000})  # 延迟 5 秒
)

进阶题

6. 高可用集群

┌─────────────────────────────────────────────────────────┐
│                    集群模式                              │
├─────────────────────────────────────────────────────────┤
│                                                          │
│  普通集群:                                              │
│  - 元数据同步,消息不同步                                │
│  - 访问非消息所在节点需要转发                            │
│                                                          │
│  镜像队列(HA):                                        │
│  - 消息同步到多个节点                                    │
│  - 主节点故障自动切换                                    │
│                                                          │
│  ┌─────────┐    ┌─────────┐    ┌─────────┐             │
│  │  Node1  │◄──►│  Node2  │◄──►│  Node3  │             │
│  │ (Master)│    │ (Mirror)│    │ (Mirror)│             │
│  └─────────┘    └─────────┘    └─────────┘             │
│                                                          │
└─────────────────────────────────────────────────────────┘
bash
# 配置镜像队列策略
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all","ha-sync-mode":"automatic"}'

# 策略参数
# ha-mode: all(所有节点)/ exactly(指定数量)/ nodes(指定节点)
# ha-sync-mode: manual(手动同步)/ automatic(自动同步)

7. 消息顺序性

python
# 保证消息顺序的方法:
# 1. 单队列单消费者
# 2. 按业务 ID 路由到同一队列

# 示例:按用户 ID 分发到固定队列
def get_queue_name(user_id, queue_count=4):
    return f"order_queue_{hash(user_id) % queue_count}"

# 发送消息
queue = get_queue_name(user_id)
channel.basic_publish(exchange='', routing_key=queue, body=message)

避坑指南

错误回答正确理解
"RabbitMQ 默认持久化"需要显式设置队列和消息持久化
"消费者 ACK 就不会丢消息"还需要生产者确认和持久化
"镜像队列无性能损耗"同步会降低吞吐量
"消息队列天然保证顺序"多消费者会打乱顺序
"死信队列自动创建"需要手动配置 DLX

B. 实战文档

常用命令

bash
# 用户管理
rabbitmqctl add_user admin password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

# 队列管理
rabbitmqctl list_queues
rabbitmqctl list_queues name messages consumers
rabbitmqctl purge_queue queue_name
rabbitmqctl delete_queue queue_name

# 集群管理
rabbitmqctl cluster_status
rabbitmqctl join_cluster rabbit@node1
rabbitmqctl forget_cluster_node rabbit@node2

# 策略管理
rabbitmqctl list_policies
rabbitmqctl set_policy name pattern definition
rabbitmqctl clear_policy name

# 监控
rabbitmqctl list_connections
rabbitmqctl list_channels

Python 示例

python
import pika

# 连接
connection = pika.BlockingConnection(
    pika.ConnectionParameters('localhost')
)
channel = connection.channel()

# 声明队列
channel.queue_declare(queue='hello', durable=True)

# 发送消息
channel.basic_publish(
    exchange='',
    routing_key='hello',
    body='Hello World!',
    properties=pika.BasicProperties(delivery_mode=2)
)

# 消费消息
def callback(ch, method, properties, body):
    print(f"Received {body}")
    ch.basic_ack(delivery_tag=method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='hello', on_message_callback=callback)
channel.start_consuming()

基于 VitePress 构建