← Về danh sách bài họcBài 8/20

🔧 Bài 8: RabbitMQ + Node.js Thực Hành

⏱️ Thời gian đọc: 25 phút | 📚 Độ khó: Trung bình

🎯 Sau bài học này, bạn sẽ:

1. Project Setup

mkdir rabbitmq-demo && cd rabbitmq-demo
npm init -y
npm install amqplib dotenv
// lib/rabbitmq.js - Connection wrapper
const amqp = require('amqplib');

let connection = null;
let channel = null;

async function getChannel() {
    if (channel) return channel;

    connection = await amqp.connect(
        process.env.RABBITMQ_URL || 'amqp://admin:secret123@localhost'
    );

    // Handle connection errors
    connection.on('error', (err) => {
        console.error('RabbitMQ connection error:', err);
        connection = null;
        channel = null;
    });

    connection.on('close', () => {
        console.log('RabbitMQ connection closed');
        connection = null;
        channel = null;
    });

    channel = await connection.createChannel();
    return channel;
}

async function closeConnection() {
    if (channel) await channel.close();
    if (connection) await connection.close();
}

module.exports = { getChannel, closeConnection };

2. Work Queue Pattern

Multiple workers xử lý messages song song, mỗi message chỉ được 1 worker xử lý:

// producer.js - Gửi email tasks
const { getChannel, closeConnection } = require('./lib/rabbitmq');

const QUEUE = 'email-tasks';

async function sendEmailTask(emailData) {
    const ch = await getChannel();

    // Durable queue: tồn tại sau restart
    await ch.assertQueue(QUEUE, { durable: true });

    // Persistent message: lưu vào disk
    ch.sendToQueue(QUEUE, Buffer.from(JSON.stringify(emailData)), {
        persistent: true,
        contentType: 'application/json',
        timestamp: Date.now()
    });

    console.log(`📨 Queued email to: ${emailData.to}`);
}

// Gửi 10 email tasks
async function main() {
    for (let i = 1; i <= 10; i++) {
        await sendEmailTask({
            to: `user${i}@gmail.com`,
            subject: `Welcome User ${i}!`,
            body: `Hello user ${i}, welcome to our platform!`
        });
    }
    await closeConnection();
}

main();
// worker.js - Xử lý email tasks
const { getChannel } = require('./lib/rabbitmq');

const QUEUE = 'email-tasks';

async function startWorker(workerId) {
    const ch = await getChannel();
    await ch.assertQueue(QUEUE, { durable: true });

    // Prefetch: mỗi worker chỉ nhận 1 message tại một thời điểm
    ch.prefetch(1);

    console.log(`[Worker ${workerId}] Waiting for tasks...`);

    ch.consume(QUEUE, async (msg) => {
        const email = JSON.parse(msg.content.toString());
        console.log(`[Worker ${workerId}] Processing: ${email.to}`);

        try {
            // Giả lập gửi email (2 giây)
            await new Promise(r => setTimeout(r, 2000));
            console.log(`[Worker ${workerId}] ✅ Sent to: ${email.to}`);

            // ACK: xác nhận đã xử lý xong
            ch.ack(msg);
        } catch (err) {
            console.error(`[Worker ${workerId}] ❌ Failed:`, err.message);

            // NACK + requeue: đưa lại vào queue
            ch.nack(msg, false, true);
        }
    });
}

startWorker(process.argv[2] || '1');
# Terminal 1: Worker 1
node worker.js 1

# Terminal 2: Worker 2
node worker.js 2

# Terminal 3: Gửi tasks
node producer.js

# Kết quả: Worker 1 và Worker 2 chia nhau xử lý 10 emails

3. ACK/NACK Chi Tiết

ch.consume(QUEUE, async (msg) => {
    try {
        await processMessage(msg);

        // ✅ ACK: message đã xử lý thành công
        ch.ack(msg);

    } catch (err) {
        const retryCount = (msg.properties.headers?.['x-retry-count'] || 0);

        if (retryCount < 3) {
            // 🔄 Retry: gửi lại với retry count tăng
            ch.sendToQueue(QUEUE, msg.content, {
                persistent: true,
                headers: { 'x-retry-count': retryCount + 1 }
            });
            ch.ack(msg); // ACK message cũ
            console.log(`🔄 Retry ${retryCount + 1}/3`);

        } else {
            // ❌ NACK + no requeue → Dead Letter Queue
            ch.nack(msg, false, false);
            console.log('💀 Moved to Dead Letter Queue');
        }
    }
});
⚠️ Quan trọng về ACK:
• Nếu không ACK → message sẽ bị "stuck" (unacked)
• Nếu worker crash trước khi ACK → message tự động requeue
noAck: true → auto ack (nhanh nhưng mất reliability)

4. Prefetch & Fair Dispatch

// Prefetch = 1: Worker chỉ nhận 1 message mỗi lần
// Worker nào xong trước nhận message tiếp (fair dispatch)
ch.prefetch(1);

// Prefetch = 5: Worker nhận 5 messages cùng lúc
// Throughput cao hơn nhưng message phân bổ ít đều hơn
ch.prefetch(5);

// Prefetch = 0: Unlimited (default)
// Worker nhận tất cả messages → không fair
ch.prefetch(0);
💡 Best practice:
• CPU-intensive tasks: prefetch = 1
• I/O-intensive tasks: prefetch = 5-20
• Lightweight tasks: prefetch = 50-100

5. Pub/Sub Ví Dụ Thực Tế

// event-publisher.js
async function publishEvent(eventType, data) {
    const ch = await getChannel();
    await ch.assertExchange('app-events', 'topic', { durable: true });

    const event = {
        type: eventType,
        data,
        timestamp: new Date().toISOString(),
        id: require('crypto').randomUUID()
    };

    ch.publish('app-events', eventType, Buffer.from(JSON.stringify(event)));
    console.log(`📡 Published: ${eventType}`);
}

// Sử dụng
publishEvent('order.created', { orderId: 123, total: 500000 });
publishEvent('user.registered', { userId: 456, email: '[email protected]' });
// event-subscriber.js
async function subscribe(pattern, handler) {
    const ch = await getChannel();
    await ch.assertExchange('app-events', 'topic', { durable: true });

    const q = await ch.assertQueue('', { exclusive: true });
    ch.bindQueue(q.queue, 'app-events', pattern);

    ch.consume(q.queue, (msg) => {
        const event = JSON.parse(msg.content.toString());
        handler(event);
        ch.ack(msg);
    });
}

// Subscribe tất cả order events
subscribe('order.*', (event) => {
    console.log('📦 Order event:', event.type, event.data);
});

// Subscribe tất cả events
subscribe('#', (event) => {
    console.log('📊 Analytics:', event.type);
});

📝 Tóm Tắt