🔀 Bài 3: Các Mô Hình Messaging
🎯 Sau bài học này, bạn sẽ:
- Hiểu 5 mô hình messaging phổ biến
- Phân biệt Point-to-Point và Publish/Subscribe
- Biết Fan-out, Request-Reply, và Priority Queue
- Chọn đúng mô hình cho từng bài toán
1. Point-to-Point (Một-Một)
Mô hình đơn giản nhất: một message chỉ được xử lý bởi một consumer duy nhất. Nếu có nhiều consumer cùng listen trên một queue, mỗi message chỉ đến một consumer (round-robin).
┌────────────┐
┌────────────▶│ Consumer 1 │ ← Nhận msg A
│ └────────────┘
┌──────────┐ │ msg A,B,C
│ Producer │──┤ ┌──────┐ ┌────────────┐
└──────────┘ └─▶│Queue │──▶│ Consumer 2 │ ← Nhận msg B
└──────┘ └────────────┘
┌────────────┐
────▶│ Consumer 3 │ ← Nhận msg C
└────────────┘
→ Mỗi message CHỈ được xử lý bởi 1 consumer (Work Queue)
• Task queue (xử lý background jobs)
• Gửi email, resize ảnh, export PDF
• Load balancing giữa nhiều workers
2. Publish/Subscribe (Một-Nhiều)
Producer (Publisher) gửi message đến một topic/exchange, và tất cả subscribers đều nhận được bản copy của message đó.
┌────────────┐ ┌─────────────┐
┌───▶│ Queue 1 │──▶│ Email Svc │
│ └────────────┘ └─────────────┘
┌───────────┐ msg ┌────┴────┐
│ Publisher │──────▶│Exchange │ ┌────────────┐ ┌─────────────┐
└───────────┘ │(Fanout) ├──▶│ Queue 2 │──▶│ SMS Svc │
└────┬────┘ └────────────┘ └─────────────┘
│ ┌────────────┐ ┌─────────────┐
└───▶│ Queue 3 │──▶│ Push Notif │
└────────────┘ └─────────────┘
→ TẤT CẢ subscribers đều nhận được message
// Ví dụ: Khi user đăng ký mới
const event = {
type: 'USER_REGISTERED',
data: { userId: 123, email: '[email protected]' }
};
// Publisher gửi 1 event
await exchange.publish('user-events', event);
// Subscriber 1: Gửi welcome email
// Subscriber 2: Tạo default settings
// Subscriber 3: Gửi notification cho admin
// → Tất cả đều nhận được event này!
• Notification system (email + SMS + push)
• Event broadcasting
• Logging/monitoring (nhiều service cùng subscribe)
3. Fan-out
Fan-out là biến thể của Pub/Sub, message được duplicate và gửi đến tất cả queue được bind. Trong RabbitMQ, đây là Fanout Exchange.
// RabbitMQ Fanout Exchange
const amqp = require('amqplib');
async function setupFanout() {
const conn = await amqp.connect('amqp://localhost');
const ch = await conn.createChannel();
// Tạo fanout exchange
await ch.assertExchange('notifications', 'fanout', {
durable: true
});
// Tạo queue cho mỗi service
const emailQueue = await ch.assertQueue('email-notifications');
const smsQueue = await ch.assertQueue('sms-notifications');
const pushQueue = await ch.assertQueue('push-notifications');
// Bind tất cả queue vào exchange
ch.bindQueue(emailQueue.queue, 'notifications', '');
ch.bindQueue(smsQueue.queue, 'notifications', '');
ch.bindQueue(pushQueue.queue, 'notifications', '');
// Publish → tất cả queue đều nhận được
ch.publish('notifications', '', Buffer.from(
JSON.stringify({ message: 'New order #123!' })
));
}
4. Request-Reply (RPC)
Client gửi request message kèm reply-to queue → Server xử lý và gửi response về reply queue. Đây là pattern RPC (Remote Procedure Call) qua Message Queue.
┌────────┐ Request + replyTo ┌─────────┐
│ Client │────────────────────▶│ Queue │──▶ Server xử lý
│ │ └─────────┘
│ │◀──────────────────── Reply Queue ◀── Server trả kết quả
└────────┘ Response
// RPC Client
async function callRPC(data) {
const ch = await conn.createChannel();
// Tạo temporary reply queue
const replyQueue = await ch.assertQueue('', {
exclusive: true // Tự xóa khi disconnect
});
const correlationId = generateUUID();
return new Promise((resolve) => {
// Lắng nghe reply
ch.consume(replyQueue.queue, (msg) => {
if (msg.properties.correlationId === correlationId) {
resolve(JSON.parse(msg.content.toString()));
}
}, { noAck: true });
// Gửi request
ch.sendToQueue('rpc-queue', Buffer.from(JSON.stringify(data)), {
correlationId,
replyTo: replyQueue.queue
});
});
}
// Sử dụng
const result = await callRPC({ action: 'calculate', x: 10, y: 20 });
console.log('Result:', result); // { sum: 30 }
Chỉ dùng khi cần: reliability (retry tự động), load balancing giữa workers, hoặc khi direct HTTP call không phù hợp. Nếu chỉ cần request-response đơn giản, dùng HTTP/gRPC sẽ đơn giản hơn.
5. Topic-based Routing
Message được route dựa trên routing key khớp với pattern. Consumer chỉ nhận message của topic mình quan tâm.
Producer gửi:
routing_key = "order.created" → Queue A ✓, Queue B ✓
routing_key = "order.cancelled" → Queue A ✓, Queue B ✗
routing_key = "user.registered" → Queue A ✗, Queue C ✓
Queue A: bind pattern "order.*" → Nhận mọi event về order
Queue B: bind pattern "order.created" → Chỉ nhận order created
Queue C: bind pattern "user.*" → Nhận mọi event về user
// Topic Exchange trong RabbitMQ
await ch.assertExchange('app-events', 'topic');
// Order service: subscribe tất cả event về order
ch.bindQueue('order-queue', 'app-events', 'order.*');
// Analytics: subscribe TẤT CẢ events
ch.bindQueue('analytics-queue', 'app-events', '#');
// Billing: chỉ subscribe order.created
ch.bindQueue('billing-queue', 'app-events', 'order.created');
// Publish events
ch.publish('app-events', 'order.created', Buffer.from('...'));
ch.publish('app-events', 'order.shipped', Buffer.from('...'));
ch.publish('app-events', 'user.registered', Buffer.from('...'));
6. So Sánh Các Mô Hình
| Mô hình | Số Consumer nhận | Use case | Broker hỗ trợ |
|---|---|---|---|
| Point-to-Point | 1 consumer | Task queue, background jobs | RabbitMQ, SQS |
| Pub/Sub | Tất cả subscribers | Notifications, events | RabbitMQ, Kafka, Redis |
| Fan-out | Tất cả queues | Broadcasting | RabbitMQ |
| Request-Reply | 1 (RPC) | Remote procedure call | RabbitMQ |
| Topic Routing | Subscribers khớp pattern | Event routing linh hoạt | RabbitMQ, Kafka |
📝 Tóm Tắt Bài Học
- Point-to-Point: 1 message → 1 consumer (work queue)
- Pub/Sub: 1 message → tất cả subscribers
- Fan-out: Broadcast message đến tất cả queues
- Request-Reply: RPC pattern qua message queue
- Topic Routing: Route dựa trên pattern matching