← Về danh sách bài họcBài 8/20
🔧 Bài 8: RabbitMQ + Node.js Thực Hành
🎯 Sau bài học này, bạn sẽ:
- Xây dựng Work Queue pattern hoàn chỉnh
- Hiểu ACK/NACK và message reliability
- Sử dụng Prefetch để kiểm soát throughput
- Xử lý retry và error handling
1. Project Setup
mkdir rabbitmq-demo && cd rabbitmq-demo
npm init -y
npm install amqplib dotenv
// lib/rabbitmq.js - Connection wrapper
const amqp = require('amqplib');
let connection = null;
let channel = null;
async function getChannel() {
if (channel) return channel;
connection = await amqp.connect(
process.env.RABBITMQ_URL || 'amqp://admin:secret123@localhost'
);
// Handle connection errors
connection.on('error', (err) => {
console.error('RabbitMQ connection error:', err);
connection = null;
channel = null;
});
connection.on('close', () => {
console.log('RabbitMQ connection closed');
connection = null;
channel = null;
});
channel = await connection.createChannel();
return channel;
}
async function closeConnection() {
if (channel) await channel.close();
if (connection) await connection.close();
}
module.exports = { getChannel, closeConnection };
2. Work Queue Pattern
Multiple workers xử lý messages song song, mỗi message chỉ được 1 worker xử lý:
// producer.js - Gửi email tasks
const { getChannel, closeConnection } = require('./lib/rabbitmq');
const QUEUE = 'email-tasks';
async function sendEmailTask(emailData) {
const ch = await getChannel();
// Durable queue: tồn tại sau restart
await ch.assertQueue(QUEUE, { durable: true });
// Persistent message: lưu vào disk
ch.sendToQueue(QUEUE, Buffer.from(JSON.stringify(emailData)), {
persistent: true,
contentType: 'application/json',
timestamp: Date.now()
});
console.log(`📨 Queued email to: ${emailData.to}`);
}
// Gửi 10 email tasks
async function main() {
for (let i = 1; i <= 10; i++) {
await sendEmailTask({
to: `user${i}@gmail.com`,
subject: `Welcome User ${i}!`,
body: `Hello user ${i}, welcome to our platform!`
});
}
await closeConnection();
}
main();
// worker.js - Xử lý email tasks
const { getChannel } = require('./lib/rabbitmq');
const QUEUE = 'email-tasks';
async function startWorker(workerId) {
const ch = await getChannel();
await ch.assertQueue(QUEUE, { durable: true });
// Prefetch: mỗi worker chỉ nhận 1 message tại một thời điểm
ch.prefetch(1);
console.log(`[Worker ${workerId}] Waiting for tasks...`);
ch.consume(QUEUE, async (msg) => {
const email = JSON.parse(msg.content.toString());
console.log(`[Worker ${workerId}] Processing: ${email.to}`);
try {
// Giả lập gửi email (2 giây)
await new Promise(r => setTimeout(r, 2000));
console.log(`[Worker ${workerId}] ✅ Sent to: ${email.to}`);
// ACK: xác nhận đã xử lý xong
ch.ack(msg);
} catch (err) {
console.error(`[Worker ${workerId}] ❌ Failed:`, err.message);
// NACK + requeue: đưa lại vào queue
ch.nack(msg, false, true);
}
});
}
startWorker(process.argv[2] || '1');
# Terminal 1: Worker 1
node worker.js 1
# Terminal 2: Worker 2
node worker.js 2
# Terminal 3: Gửi tasks
node producer.js
# Kết quả: Worker 1 và Worker 2 chia nhau xử lý 10 emails
3. ACK/NACK Chi Tiết
ch.consume(QUEUE, async (msg) => {
try {
await processMessage(msg);
// ✅ ACK: message đã xử lý thành công
ch.ack(msg);
} catch (err) {
const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);
if (retryCount < 3) {
// 🔄 Retry: gửi lại với retry count tăng
ch.sendToQueue(QUEUE, msg.content, {
persistent: true,
headers: { 'x-retry-count': retryCount + 1 }
});
ch.ack(msg); // ACK message cũ
console.log(`🔄 Retry ${retryCount + 1}/3`);
} else {
// ❌ NACK + no requeue → Dead Letter Queue
ch.nack(msg, false, false);
console.log('💀 Moved to Dead Letter Queue');
}
}
});
⚠️ Quan trọng về ACK:
• Nếu không ACK → message sẽ bị "stuck" (unacked)
• Nếu worker crash trước khi ACK → message tự động requeue
•
• Nếu không ACK → message sẽ bị "stuck" (unacked)
• Nếu worker crash trước khi ACK → message tự động requeue
•
noAck: true → auto ack (nhanh nhưng mất reliability)
4. Prefetch & Fair Dispatch
// Prefetch = 1: Worker chỉ nhận 1 message mỗi lần
// Worker nào xong trước nhận message tiếp (fair dispatch)
ch.prefetch(1);
// Prefetch = 5: Worker nhận 5 messages cùng lúc
// Throughput cao hơn nhưng message phân bổ ít đều hơn
ch.prefetch(5);
// Prefetch = 0: Unlimited (default)
// Worker nhận tất cả messages → không fair
ch.prefetch(0);
💡 Best practice:
• CPU-intensive tasks: prefetch = 1
• I/O-intensive tasks: prefetch = 5-20
• Lightweight tasks: prefetch = 50-100
• CPU-intensive tasks: prefetch = 1
• I/O-intensive tasks: prefetch = 5-20
• Lightweight tasks: prefetch = 50-100
5. Pub/Sub Ví Dụ Thực Tế
// event-publisher.js
async function publishEvent(eventType, data) {
const ch = await getChannel();
await ch.assertExchange('app-events', 'topic', { durable: true });
const event = {
type: eventType,
data,
timestamp: new Date().toISOString(),
id: require('crypto').randomUUID()
};
ch.publish('app-events', eventType, Buffer.from(JSON.stringify(event)));
console.log(`📡 Published: ${eventType}`);
}
// Sử dụng
publishEvent('order.created', { orderId: 123, total: 500000 });
publishEvent('user.registered', { userId: 456, email: '[email protected]' });
// event-subscriber.js
async function subscribe(pattern, handler) {
const ch = await getChannel();
await ch.assertExchange('app-events', 'topic', { durable: true });
const q = await ch.assertQueue('', { exclusive: true });
ch.bindQueue(q.queue, 'app-events', pattern);
ch.consume(q.queue, (msg) => {
const event = JSON.parse(msg.content.toString());
handler(event);
ch.ack(msg);
});
}
// Subscribe tất cả order events
subscribe('order.*', (event) => {
console.log('📦 Order event:', event.type, event.data);
});
// Subscribe tất cả events
subscribe('#', (event) => {
console.log('📊 Analytics:', event.type);
});
📝 Tóm Tắt
- Work Queue: Multiple workers xử lý song song
- ACK/NACK: Xác nhận xử lý message, retry khi lỗi
- Prefetch: Kiểm soát số messages mỗi worker nhận
- Persistent: durable queue + persistent message = không mất data
- Pub/Sub: Topic exchange cho event-driven architecture