在 [[RabbitMQ]] 的消息路由体系中,[[Exchange]] 是承上启下的核心枢纽。它不存储消息,只负责路由——接收生产者投递的消息,根据路由规则分发到下游的 [[队列]]。
如果把 RabbitMQ 比作一个邮局分拣中心,[[Exchange]] 就是分拣员。每天,海量的信件(消息)涌入分拣中心,分拣员需要根据信封上的地址信息,将信件投递到正确的邮箱(队列)中。
本文将逐一拆解 RabbitMQ 的四种 [[Exchange]] 类型:[[fanout]]、[[direct]]、[[topic]]、[[headers]]。
✦ fanout:扇出广播
✦ 生活类比:小区大喇叭
物业工作人员拿着大喇叭站在小区广场喊话:”所有人注意啦,明天停水!”不管你是住 1 号楼还是 2 号楼,不管你是业主还是租户,只要你在小区里(绑定了这个交换器),你都能听到这条消息。
✦ 路由规则
[[fanout]] 交换器是四种类型中最简单直接的一种。它完全无视消息的 [[RoutingKey]],收到消息后立即复制 N 份,转发给所有与之绑定的队列。
这种”简单粗暴”的广播模式,在特定场景下反而效率最高——省去了复杂的路由计算。
✦ 生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| using System.Text; using RabbitMQ.Client;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_fanout_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, durable: true);
const string message = "系统公告:明天凌晨 02:00 进行服务器维护"; var body = Encoding.UTF8.GetBytes(message);
await channel.BasicPublishAsync(exchange: exchangeName, routingKey: string.Empty, body: body);
Console.WriteLine($"[x] 已广播消息:{message}"); Console.WriteLine("[x] 按任意键退出..."); Console.ReadKey();
|
✦ 消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_fanout_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Fanout, durable: true);
var queueDeclareResult = await channel.QueueDeclareAsync( queue: string.Empty, durable: false, exclusive: true, autoDelete: true);
var queueName = queueDeclareResult.QueueName; Console.WriteLine($"[x] 消费者队列已创建:{queueName}");
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: string.Empty);
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); Console.WriteLine($"[x] 收到广播消息:{message}"); await Task.CompletedTask; };
await channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("[x] 等待广播消息,按任意键退出..."); Console.ReadKey();
|
✦ 典型场景
- 群发系统公告
- 配置热更新广播
- 实时日志流分发
- 缓存失效通知
✦ direct:精确直连
✦ 生活类比:挂号信
你寄了一封挂号信,信封上写着”收件人:张三,房间号:101”。分拣员拿着信,去核对小区里的信箱标签。只有贴着”张三,101”标签的信箱,这封信才会被投进去。其他的信箱,哪怕只差一个字(比如”张三,102”),都收不到。
✦ 路由规则
[[direct]] 交换器执行精确匹配。消息的 [[RoutingKey]] 必须与队列的 [[BindingKey]] 完全一致(字符串相等),消息才会被路由到该队列。
这种严格匹配机制,确保了消息投递的精确性。
✦ 生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| using System.Text; using RabbitMQ.Client;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_direct_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, durable: true);
const string message = "数据库连接失败,请检查连接字符串"; var body = Encoding.UTF8.GetBytes(message);
const string routingKey = "error"; await channel.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
Console.WriteLine($"[x] 已发送错误日志:{message}"); Console.WriteLine($"[x] 路由键:{routingKey}"); Console.WriteLine("[x] 按任意键退出..."); Console.ReadKey();
|
✦ 消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_direct_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, durable: true);
const string queueName = "error_log_queue"; await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false);
await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: "error");
Console.WriteLine($"[x] 队列 {queueName} 已绑定到交换器,监听 RoutingKey: error");
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine($"[x] 收到消息 [RoutingKey={routingKey}]:{message}"); await Task.CompletedTask; };
await channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("[x] 等待消息,按任意键退出..."); Console.ReadKey();
|
✦ 典型场景
- 点对点消息投递
- 根据日志级别过滤
- 任务分发(按任务类型路由)
- 用户通知(按用户 ID 精准投递)
✦ topic:主题匹配
✦ 生活类比:订阅报纸
你去邮局订阅报纸:
- 小王说:”我要订所有关于体育的报纸。”([[BindingKey]]:
*.sport.#)
- 小李说:”我要订所有关于美国的报纸。”([[BindingKey]]:
usa.#)
- 小张说:”我要订所有美国体育的报纸。”([[BindingKey]]:
usa.sport.*)
当邮局收到一份报纸,标题是 usa.sport.football(美国体育足球):
- 小王收到(匹配体育)
- 小李收到(匹配美国)
- 小张收到(匹配美国体育)
✦ 路由规则
[[topic]] 交换器支持模糊匹配。[[RoutingKey]] 和 [[BindingKey]] 必须是由点号 . 分隔的单词列表(如 user.order.create)。交换器使用两个特殊的通配符进行匹配:
| 通配符 |
匹配规则 |
示例 |
* (星号) |
匹配一个单词 |
user.* 匹配 user.login,但不匹配 user.order.create |
# (井号) |
匹配零个或多个单词 |
user.# 匹配 user.login、user.order.create,甚至只匹配 user |
这种灵活的模式匹配机制,非常适合多维度、多层级的业务分类。
✦ 生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| using System.Text; using RabbitMQ.Client;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_topic_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, durable: true);
const string message = "美国 NFL 超级碗决赛即将开始"; var body = Encoding.UTF8.GetBytes(message);
const string routingKey = "usa.sport.football"; await channel.BasicPublishAsync(exchange: exchangeName, routingKey: routingKey, body: body);
Console.WriteLine($"[x] 已发送消息:{message}"); Console.WriteLine($"[x] 路由键:{routingKey}"); Console.WriteLine("[x] 按任意键退出..."); Console.ReadKey();
|
✦ 消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_topic_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Topic, durable: true);
const string queueName = "sport_news_queue"; await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false);
const string bindingKey = "*.sport.#"; await channel.QueueBindAsync(queue: queueName, exchange: exchangeName, routingKey: bindingKey);
Console.WriteLine($"[x] 队列 {queueName} 已绑定到交换器,监听模式:{bindingKey}");
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine($"[x] 收到消息 [RoutingKey={routingKey}]:{message}"); await Task.CompletedTask; };
await channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("[x] 等待消息,按任意键退出..."); Console.ReadKey();
|
✦ 典型场景
- 复杂业务消息分类(按模块、按操作类型)
- 多维度日志收集(按服务、按级别、按环境)
- 分布式事件总线
- IoT 设备消息路由(按设备类型、地域、状态)
✦ 生活类比:填表审查
这是一种非常官僚的投递方式。分拣员不看信封上的地址,而是让你填一张表格,表格里有”身高”、”体重”、”年龄”等栏目。只有当信件表格里的数据与信箱要求的表格数据完全一致(或满足特定条件)时,信件才会被投递。
✦ 路由规则
[[headers]] 交换器忽略 [[RoutingKey]],转而对比消息的 headers 属性(键值对)。它可以设置 x-match 参数:
x-match: all:所有 header 键值对都必须匹配
x-match: any:只要有一个 header 键值对匹配即可
✦ 生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52
| using System.Text; using RabbitMQ.Client;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_headers_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Headers, durable: true);
const string message = "这是一份 PDF 格式的财务报告"; var body = Encoding.UTF8.GetBytes(message);
var headers = new Dictionary<string, object?> { ["format"] = "pdf", ["type"] = "report", ["department"] = "finance" };
var properties = new BasicProperties { Headers = headers }; await channel.BasicPublishAsync( exchange: exchangeName, routingKey: string.Empty, mandatory: false, basicProperties: properties, body: body);
Console.WriteLine($"[x] 已发送消息:{message}"); Console.WriteLine($"[x] Headers: format=pdf, type=report, department=finance"); Console.WriteLine("[x] 按任意键退出..."); Console.ReadKey();
|
✦ 消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77
| using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events;
ConnectionFactory factory = new ConnectionFactory { HostName = "mq.example.com", VirtualHost = "/", Password = "your_password", UserName = "your_username", Port = 5671, Ssl = new SslOption { Enabled = true, ServerName = "mq.example.com" } };
await using var connection = await factory.CreateConnectionAsync(); await using var channel = await connection.CreateChannelAsync();
const string exchangeName = "my_headers_exchange"; await channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Headers, durable: true);
const string queueName = "pdf_report_queue"; await channel.QueueDeclareAsync( queue: queueName, durable: true, exclusive: false, autoDelete: false);
await channel.QueueBindAsync( queue: queueName, exchange: exchangeName, routingKey: string.Empty, arguments: new Dictionary<string, object?> { ["x-match"] = "all", ["format"] = "pdf", ["type"] = "report" });
Console.WriteLine($"[x] 队列 {queueName} 已绑定到交换器"); Console.WriteLine($"[x] 匹配条件:x-match=all, format=pdf, type=report");
var consumer = new AsyncEventingBasicConsumer(channel); consumer.ReceivedAsync += async (model, ea) => { var body = ea.Body.ToArray(); var message = Encoding.UTF8.GetString(body); if (ea.BasicProperties.Headers != null) { Console.WriteLine("[x] 消息 Headers:"); foreach (var header in ea.BasicProperties.Headers) { var value = header.Value is byte[] bytes ? Encoding.UTF8.GetString(bytes) : header.Value?.ToString(); Console.WriteLine($" {header.Key}: {value}"); } } Console.WriteLine($"[x] 收到消息:{message}"); await Task.CompletedTask; };
await channel.BasicConsumeAsync(queue: queueName, autoAck: true, consumer: consumer);
Console.WriteLine("[x] 等待消息,按任意键退出..."); Console.ReadKey();
|
✦ 典型场景
[[headers]] 交换器在实际工程中极少使用,主要因为:
- 性能较差(需要解析并匹配复杂的键值对)
- 配置繁琐
- 调试困难
仅在极少数需要基于复杂元数据进行过滤的场景中出现,了解即可。
✦ 四种交换器对比
| 类型 |
匹配方式 |
性能 |
典型场景 |
| fanout |
无视 RoutingKey,广播 |
最高 |
公告广播、配置更新 |
| direct |
RoutingKey 精确匹配 |
高 |
点对点消息、日志级别过滤 |
| topic |
RoutingKey 模式匹配 |
中 |
多维度分类、事件总线 |
| headers |
headers 键值对匹配 |
低 |
复杂元数据过滤(极少用) |
✦ 星轨总结
在数字领地的消息流转架构中,[[Exchange]] 是路由决策的大脑。选择合适的交换器类型,直接影响消息投递的精确性与系统性能:
- 需要广播?选 [[fanout]]
- 需要精准投递?选 [[direct]]
- 需要灵活分类?选 [[topic]]
- 需要复杂元数据过滤?慎用 [[headers]]
理解这四种交换器的底层逻辑,才能在架构设计中做出正确的技术选型。