← Về danh sách bài họcBài 12/20
🚀 Bài 12: Dự Án - Notification System
🎯 Dự án này bạn sẽ xây dựng:
- Notification System với RabbitMQ + Node.js
- 3 services: API Gateway, Email Worker, SMS Worker
- Fanout exchange broadcast notifications
- Retry, DLQ, monitoring hoàn chỉnh
1. Kiến Trúc Hệ Thống
┌────────────┐ ┌──────────────┐ ┌─────────────────┐
│ Client │────▶│ API Gateway │────▶│ RabbitMQ │
│ (POST /api │ │ (Express.js) │ │ │
│ /notify) │ └──────────────┘ │ Exchange: │
└────────────┘ │ notifications │
│ (fanout) │
└───────┬─────────┘
┌───────────┼───────────┐
▼ ▼ ▼
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Email │ │ SMS │ │ Push │
│ Worker │ │ Worker │ │ Worker │
└──────────┘ └──────────┘ └──────────┘
2. Docker Compose
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management
ports:
- "5672:5672"
- "15672:15672"
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: secret123
healthcheck:
test: rabbitmq-diagnostics -q ping
interval: 10s
timeout: 5s
retries: 5
api-gateway:
build: ./api-gateway
ports:
- "3000:3000"
depends_on:
rabbitmq:
condition: service_healthy
environment:
RABBITMQ_URL: amqp://admin:secret123@rabbitmq
email-worker:
build: ./email-worker
depends_on:
rabbitmq:
condition: service_healthy
environment:
RABBITMQ_URL: amqp://admin:secret123@rabbitmq
sms-worker:
build: ./sms-worker
depends_on:
rabbitmq:
condition: service_healthy
environment:
RABBITMQ_URL: amqp://admin:secret123@rabbitmq
3. API Gateway
// api-gateway/server.js
const express = require('express');
const amqp = require('amqplib');
const crypto = require('crypto');
const app = express();
app.use(express.json());
let channel;
async function connectRabbitMQ() {
const conn = await amqp.connect(process.env.RABBITMQ_URL);
channel = await conn.createChannel();
await channel.assertExchange('notifications', 'fanout', {
durable: true
});
console.log('✅ Connected to RabbitMQ');
}
// POST /api/notify
app.post('/api/notify', async (req, res) => {
const { userId, message, type } = req.body;
const notification = {
id: crypto.randomUUID(),
userId,
message,
type: type || 'all',
timestamp: new Date().toISOString()
};
channel.publish(
'notifications',
'',
Buffer.from(JSON.stringify(notification)),
{ persistent: true, messageId: notification.id }
);
res.json({
success: true,
notificationId: notification.id,
message: 'Notification queued'
});
});
// Health check
app.get('/health', (req, res) => {
res.json({ status: 'ok', rabbitmq: !!channel });
});
connectRabbitMQ().then(() => {
app.listen(3000, () => console.log('🚀 API Gateway on port 3000'));
});
4. Email Worker
// email-worker/worker.js
const amqp = require('amqplib');
async function startWorker() {
const conn = await amqp.connect(process.env.RABBITMQ_URL);
const ch = await conn.createChannel();
await ch.assertExchange('notifications', 'fanout', { durable: true });
// Tạo queue riêng cho email worker
const q = await ch.assertQueue('email-notifications', {
durable: true,
arguments: {
'x-dead-letter-exchange': 'dlx',
'x-dead-letter-routing-key': 'email-failed'
}
});
ch.bindQueue(q.queue, 'notifications', '');
ch.prefetch(5);
console.log('📧 Email Worker started...');
ch.consume(q.queue, async (msg) => {
const notification = JSON.parse(msg.content.toString());
const retryCount = msg.properties.headers?.['x-retry'] || 0;
try {
// Giả lập gửi email
console.log(`📧 Sending email to user ${notification.userId}: ${notification.message}`);
await new Promise(r => setTimeout(r, 1000));
console.log(`✅ Email sent for notification ${notification.id}`);
ch.ack(msg);
} catch (err) {
console.error(`❌ Email failed:`, err.message);
if (retryCount < 3) {
// Retry
ch.sendToQueue(q.queue, msg.content, {
persistent: true,
headers: { 'x-retry': retryCount + 1 }
});
ch.ack(msg);
} else {
// DLQ
ch.nack(msg, false, false);
}
}
});
}
startWorker();
5. SMS Worker
// sms-worker/worker.js
const amqp = require('amqplib');
async function startWorker() {
const conn = await amqp.connect(process.env.RABBITMQ_URL);
const ch = await conn.createChannel();
await ch.assertExchange('notifications', 'fanout', { durable: true });
const q = await ch.assertQueue('sms-notifications', { durable: true });
ch.bindQueue(q.queue, 'notifications', '');
ch.prefetch(3);
console.log('📱 SMS Worker started...');
ch.consume(q.queue, async (msg) => {
const notification = JSON.parse(msg.content.toString());
try {
console.log(`📱 Sending SMS to user ${notification.userId}`);
await new Promise(r => setTimeout(r, 500));
console.log(`✅ SMS sent: ${notification.id}`);
ch.ack(msg);
} catch (err) {
ch.nack(msg, false, true);
}
});
}
startWorker();
6. Testing
# Khởi động toàn bộ hệ thống
docker-compose up -d
# Gửi notification
curl -X POST http://localhost:3000/api/notify \
-H "Content-Type: application/json" \
-d '{
"userId": 123,
"message": "Đơn hàng #456 đã được xác nhận!",
"type": "all"
}'
# Kết quả:
# API Gateway: ✅ Notification queued
# Email Worker: 📧 Sending email...
# SMS Worker: 📱 Sending SMS...
# Kiểm tra RabbitMQ Management UI
# http://localhost:15672
7. Mở Rộng
💡 Bài tập nâng cao:
• Thêm Push Notification Worker
• Lưu notification history vào database
• Thêm user preferences (chọn channel nhận)
• Implement rate limiting per user
• Thêm template system cho email/SMS
• Monitoring với Prometheus + Grafana
• Thêm Push Notification Worker
• Lưu notification history vào database
• Thêm user preferences (chọn channel nhận)
• Implement rate limiting per user
• Thêm template system cho email/SMS
• Monitoring với Prometheus + Grafana
📝 Tóm Tắt Dự Án
- Kiến trúc microservices với Fanout exchange
- API Gateway nhận request → publish notification
- Workers (email, SMS) subscribe và xử lý background
- Retry 3 lần + Dead Letter Queue cho failed messages
- Docker Compose quản lý toàn bộ infrastructure