← Về danh sách bài họcBài 12/20

🚀 Bài 12: Dự Án - Notification System

⏱️ Thời gian đọc: 30 phút | 📚 Độ khó: Dự án

🎯 Dự án này bạn sẽ xây dựng:

1. Kiến Trúc Hệ Thống

┌────────────┐     ┌──────────────┐     ┌─────────────────┐
│   Client   │────▶│ API Gateway  │────▶│   RabbitMQ      │
│ (POST /api │     │ (Express.js) │     │                 │
│  /notify)  │     └──────────────┘     │ Exchange:       │
└────────────┘                          │  notifications  │
                                        │  (fanout)       │
                                        └───────┬─────────┘
                                    ┌───────────┼───────────┐
                                    ▼           ▼           ▼
                              ┌──────────┐ ┌──────────┐ ┌──────────┐
                              │ Email    │ │ SMS      │ │ Push     │
                              │ Worker   │ │ Worker   │ │ Worker   │
                              └──────────┘ └──────────┘ └──────────┘

2. Docker Compose

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: secret123
    healthcheck:
      test: rabbitmq-diagnostics -q ping
      interval: 10s
      timeout: 5s
      retries: 5

  api-gateway:
    build: ./api-gateway
    ports:
      - "3000:3000"
    depends_on:
      rabbitmq:
        condition: service_healthy
    environment:
      RABBITMQ_URL: amqp://admin:secret123@rabbitmq

  email-worker:
    build: ./email-worker
    depends_on:
      rabbitmq:
        condition: service_healthy
    environment:
      RABBITMQ_URL: amqp://admin:secret123@rabbitmq

  sms-worker:
    build: ./sms-worker
    depends_on:
      rabbitmq:
        condition: service_healthy
    environment:
      RABBITMQ_URL: amqp://admin:secret123@rabbitmq

3. API Gateway

// api-gateway/server.js
const express = require('express');
const amqp = require('amqplib');
const crypto = require('crypto');

const app = express();
app.use(express.json());

let channel;

async function connectRabbitMQ() {
    const conn = await amqp.connect(process.env.RABBITMQ_URL);
    channel = await conn.createChannel();
    await channel.assertExchange('notifications', 'fanout', {
        durable: true
    });
    console.log('✅ Connected to RabbitMQ');
}

// POST /api/notify
app.post('/api/notify', async (req, res) => {
    const { userId, message, type } = req.body;

    const notification = {
        id: crypto.randomUUID(),
        userId,
        message,
        type: type || 'all',
        timestamp: new Date().toISOString()
    };

    channel.publish(
        'notifications',
        '',
        Buffer.from(JSON.stringify(notification)),
        { persistent: true, messageId: notification.id }
    );

    res.json({
        success: true,
        notificationId: notification.id,
        message: 'Notification queued'
    });
});

// Health check
app.get('/health', (req, res) => {
    res.json({ status: 'ok', rabbitmq: !!channel });
});

connectRabbitMQ().then(() => {
    app.listen(3000, () => console.log('🚀 API Gateway on port 3000'));
});

4. Email Worker

// email-worker/worker.js
const amqp = require('amqplib');

async function startWorker() {
    const conn = await amqp.connect(process.env.RABBITMQ_URL);
    const ch = await conn.createChannel();

    await ch.assertExchange('notifications', 'fanout', { durable: true });

    // Tạo queue riêng cho email worker
    const q = await ch.assertQueue('email-notifications', {
        durable: true,
        arguments: {
            'x-dead-letter-exchange': 'dlx',
            'x-dead-letter-routing-key': 'email-failed'
        }
    });

    ch.bindQueue(q.queue, 'notifications', '');
    ch.prefetch(5);

    console.log('📧 Email Worker started...');

    ch.consume(q.queue, async (msg) => {
        const notification = JSON.parse(msg.content.toString());
        const retryCount = msg.properties.headers?.['x-retry'] || 0;

        try {
            // Giả lập gửi email
            console.log(`📧 Sending email to user ${notification.userId}: ${notification.message}`);
            await new Promise(r => setTimeout(r, 1000));

            console.log(`✅ Email sent for notification ${notification.id}`);
            ch.ack(msg);

        } catch (err) {
            console.error(`❌ Email failed:`, err.message);

            if (retryCount < 3) {
                // Retry
                ch.sendToQueue(q.queue, msg.content, {
                    persistent: true,
                    headers: { 'x-retry': retryCount + 1 }
                });
                ch.ack(msg);
            } else {
                // DLQ
                ch.nack(msg, false, false);
            }
        }
    });
}

startWorker();

5. SMS Worker

// sms-worker/worker.js
const amqp = require('amqplib');

async function startWorker() {
    const conn = await amqp.connect(process.env.RABBITMQ_URL);
    const ch = await conn.createChannel();

    await ch.assertExchange('notifications', 'fanout', { durable: true });

    const q = await ch.assertQueue('sms-notifications', { durable: true });
    ch.bindQueue(q.queue, 'notifications', '');
    ch.prefetch(3);

    console.log('📱 SMS Worker started...');

    ch.consume(q.queue, async (msg) => {
        const notification = JSON.parse(msg.content.toString());

        try {
            console.log(`📱 Sending SMS to user ${notification.userId}`);
            await new Promise(r => setTimeout(r, 500));
            console.log(`✅ SMS sent: ${notification.id}`);
            ch.ack(msg);
        } catch (err) {
            ch.nack(msg, false, true);
        }
    });
}

startWorker();

6. Testing

# Khởi động toàn bộ hệ thống
docker-compose up -d

# Gửi notification
curl -X POST http://localhost:3000/api/notify \
  -H "Content-Type: application/json" \
  -d '{
    "userId": 123,
    "message": "Đơn hàng #456 đã được xác nhận!",
    "type": "all"
  }'

# Kết quả:
# API Gateway: ✅ Notification queued
# Email Worker: 📧 Sending email...
# SMS Worker: 📱 Sending SMS...

# Kiểm tra RabbitMQ Management UI
# http://localhost:15672

7. Mở Rộng

💡 Bài tập nâng cao:
• Thêm Push Notification Worker
• Lưu notification history vào database
• Thêm user preferences (chọn channel nhận)
• Implement rate limiting per user
• Thêm template system cho email/SMS
• Monitoring với Prometheus + Grafana

📝 Tóm Tắt Dự Án