← Về danh sách bài họcBài 9/20

⚡ Bài 9: RabbitMQ Advanced Patterns

⏱️ Thời gian đọc: 25 phút | 📚 Độ khó: Nâng cao

🎯 Sau bài học này, bạn sẽ:

1. Retry Với Exponential Backoff

Khi message xử lý lỗi, thay vì retry ngay (có thể gây overload), ta dùng delayed retry với thời gian tăng dần.

// retry-pattern.js
async function setupRetrySystem(ch) {
    // Main exchange
    await ch.assertExchange('main', 'direct', { durable: true });

    // Retry exchanges (delay levels)
    const delays = [5000, 15000, 60000]; // 5s, 15s, 60s

    for (const delay of delays) {
        await ch.assertQueue(`retry-${delay}ms`, {
            durable: true,
            arguments: {
                'x-message-ttl': delay,
                'x-dead-letter-exchange': 'main',
                'x-dead-letter-routing-key': 'process'
            }
        });
    }

    // Dead Letter Queue (final failure)
    await ch.assertQueue('dead-letters', { durable: true });

    // Main processing queue
    await ch.assertQueue('process-queue', { durable: true });
    ch.bindQueue('process-queue', 'main', 'process');
}

// Consumer với retry logic
ch.consume('process-queue', async (msg) => {
    const retryCount = msg.properties.headers?.['x-retry'] || 0;
    const delays = [5000, 15000, 60000];

    try {
        await processMessage(msg);
        ch.ack(msg);
    } catch (err) {
        ch.ack(msg); // ACK message gốc

        if (retryCount < delays.length) {
            const delay = delays[retryCount];
            ch.sendToQueue(`retry-${delay}ms`, msg.content, {
                persistent: true,
                headers: { 'x-retry': retryCount + 1 }
            });
            console.log(`🔄 Retry ${retryCount + 1} after ${delay}ms`);
        } else {
            ch.sendToQueue('dead-letters', msg.content, {
                persistent: true,
                headers: { error: err.message }
            });
            console.log('💀 Max retries reached → DLQ');
        }
    }
});

2. Priority Queue

// Priority Queue: message ưu tiên cao được xử lý trước
await ch.assertQueue('tasks', {
    durable: true,
    arguments: { 'x-max-priority': 10 }
});

// Gửi messages với priority khác nhau
ch.sendToQueue('tasks', Buffer.from('Low priority task'), {
    priority: 1
});
ch.sendToQueue('tasks', Buffer.from('HIGH priority task'), {
    priority: 9
});
ch.sendToQueue('tasks', Buffer.from('Normal task'), {
    priority: 5
});

// Consumer nhận: HIGH → Normal → Low
⚠️ Lưu ý Priority Queue:
• Ảnh hưởng performance (RabbitMQ phải sort)
• Max 255 levels, khuyến nghị ≤ 10
• Chỉ hiệu quả khi queue có messages backlog

3. Delayed Message (Scheduled)

// Phương pháp 1: TTL + DLX (không cần plugin)
async function scheduleMessage(ch, queue, data, delayMs) {
    const delayQueue = `${queue}.delay.${delayMs}`;

    await ch.assertQueue(delayQueue, {
        durable: true,
        arguments: {
            'x-message-ttl': delayMs,
            'x-dead-letter-exchange': '',
            'x-dead-letter-routing-key': queue
        }
    });

    ch.sendToQueue(delayQueue, Buffer.from(JSON.stringify(data)), {
        persistent: true
    });

    console.log(`⏰ Scheduled for ${delayMs}ms later`);
}

// Sử dụng: Gửi reminder sau 24 giờ
scheduleMessage(ch, 'reminders', {
    userId: 123,
    message: 'Đơn hàng của bạn sắp hết hạn!'
}, 24 * 60 * 60 * 1000);

4. Idempotency (Xử lý message trùng lặp)

// Idempotent consumer: xử lý message trùng lặp an toàn
const processedIds = new Set(); // Production: dùng Redis

ch.consume('orders', async (msg) => {
    const data = JSON.parse(msg.content.toString());
    const messageId = msg.properties.messageId;

    // Check đã xử lý chưa
    if (processedIds.has(messageId)) {
        console.log(`⏭️ Skipping duplicate: ${messageId}`);
        ch.ack(msg);
        return;
    }

    try {
        await processOrder(data);
        processedIds.add(messageId);
        ch.ack(msg);
    } catch (err) {
        ch.nack(msg, false, true);
    }
});

// Producer: luôn gửi kèm messageId unique
ch.sendToQueue('orders', Buffer.from(JSON.stringify(order)), {
    messageId: require('crypto').randomUUID(),
    persistent: true
});
💡 Production idempotency:
• Dùng Redis SET lưu processed message IDs
• Set TTL cho key (VD: 24h) để tránh memory leak
• Hoặc dùng database unique constraint trên messageId

📝 Tóm Tắt