Từ khái niệm Event Loop cơ bản đến xây dựng hệ thống phân tán với RabbitMQ và Apache Kafka
Message Queue (MQ) là cơ chế giao tiếp bất đồng bộ giữa các thành phần trong hệ thống phân tán. Thay vì gọi trực tiếp (synchronous), các service gửi message vào hàng đợi và bên nhận xử lý khi sẵn sàng.
Event Loop là cơ chế xử lý bất đồng bộ trong JavaScript/Node.js. Hiểu Event Loop là nền tảng để hiểu cách Message Queue hoạt động.
// Minh họa Event Loop
console.log('1. Synchronous');
setTimeout(() => {
console.log('3. Macrotask (setTimeout)');
}, 0);
Promise.resolve().then(() => {
console.log('2. Microtask (Promise)');
});
// Output:
// 1. Synchronous
// 2. Microtask (Promise)
// 3. Macrotask (setTimeout)
┌──────────┐ ┌─────────────┐ ┌──────────┐
│ Producer │────▶│ Message │────▶│ Consumer │
│ (Gửi) │ │ Queue │ │ (Nhận) │
└──────────┘ └─────────────┘ └──────────┘
│ │
Gửi message Xử lý message
vào queue từ queue
Producer gửi message → Queue lưu trữ → Consumer nhận và xử lý. Đơn giản nhưng rất mạnh mẽ!
| Tiêu chí | 🐰 RabbitMQ | 🦅 Kafka |
|---|---|---|
| Mô hình | Message Broker (AMQP) | Distributed Log |
| Throughput | ~50K msg/s | ~1M msg/s |
| Lưu trữ | Xóa sau khi consume | Giữ lại theo retention |
| Routing | Linh hoạt (Exchange) | Đơn giản (Topic) |
| Use case | Task queue, RPC | Event streaming, Log |
| Ordering | Per queue | Per partition |
| Replay | Không | Có (offset reset) |
🔄 Bài 1: Event Loop là gì? - Cách
JavaScript xử lý bất đồng bộ
📨 Bài 2: Message Queue là gì? -
Khái niệm & Use Cases
🔀 Bài 3: Các mô hình
Messaging - Point-to-Point, Pub/Sub, Fan-out
⚖️ Bài 4: So sánh Message
Broker - RabbitMQ vs Kafka vs Redis vs SQS
🏗️ Bài 5: Kiến trúc
Event-Driven - Microservices với Message Queue
🐰 Bài 6: RabbitMQ là gì? Cài
đặt & Cấu hình
🔗 Bài 7: Exchange, Queue &
Binding - Routing trong RabbitMQ
💻 Bài 8: Producer &
Consumer với Node.js
🎛️ Bài 9: RabbitMQ Patterns -
Dead Letter, Priority, TTL
🏢 Bài 10: RabbitMQ Clustering &
High Availability
📊 Bài 11: Monitoring &
Troubleshooting RabbitMQ
🚀 Bài 12: Dự án thực tế - Hệ
thống gửi Email với RabbitMQ
🦅 Bài 13: Kafka là gì? Cài đặt &
Kiến trúc
💻 Bài 14: Kafka Producer &
Consumer API
📦 Bài 15: Topic, Partition &
Replication
👥 Bài 16: Consumer Group &
Offset Management
🌊 Bài 17: Kafka Streams - Stream
Processing
🔌 Bài 18: Kafka Connect & Schema
Registry
🏭 Bài 19: Kafka trong Production -
Tuning & Security
🚀 Bài 20: Dự án thực tế - Hệ
thống Event Tracking
# Chạy RabbitMQ với Management UI
docker run -d --name rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
rabbitmq:3-management
# Truy cập Management UI
# http://localhost:15672
# Username: guest | Password: guest
const amqp = require('amqplib');
async function sendMessage() {
// Kết nối RabbitMQ
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
const queue = 'hello';
const message = 'Xin chào từ RabbitMQ! 🐰';
// Đảm bảo queue tồn tại
await channel.assertQueue(queue, { durable: true });
// Gửi message
channel.sendToQueue(queue, Buffer.from(message), {
persistent: true // Message không mất khi restart
});
console.log(`[✓] Đã gửi: "${message}"`);
setTimeout(() => {
conn.close();
process.exit(0);
}, 500);
}
sendMessage();
const amqp = require('amqplib');
async function receiveMessage() {
const conn = await amqp.connect('amqp://localhost');
const channel = await conn.createChannel();
const queue = 'hello';
await channel.assertQueue(queue, { durable: true });
// Chỉ nhận 1 message tại 1 thời điểm
channel.prefetch(1);
console.log('[*] Đang chờ message...');
channel.consume(queue, (msg) => {
const content = msg.content.toString();
console.log(`[✓] Nhận được: "${content}"`);
// Xử lý xong → acknowledge
channel.ack(msg);
});
}
receiveMessage();
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
async function sendEvent() {
await producer.connect();
await producer.send({
topic: 'user-events',
messages: [
{
key: 'user-123',
value: JSON.stringify({
event: 'page_view',
url: '/products/iphone',
timestamp: Date.now()
})
}
]
});
console.log('[✓] Event đã được gửi!');
await producer.disconnect();
}
sendEvent();
const { Kafka } = require('kafkajs');
const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:9092']
});
const consumer = kafka.consumer({
groupId: 'analytics-group'
});
async function consumeEvents() {
await consumer.connect();
await consumer.subscribe({
topic: 'user-events',
fromBeginning: true // Đọc từ đầu
});
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value.toString());
console.log(`[Partition ${partition}]`, event);
}
});
}
consumeEvents();