← Về danh sách bài họcBài 17/20
🌊 Bài 17: Kafka Streams & Stream Processing
🎯 Sau bài học này, bạn sẽ:
- Hiểu Stream Processing vs Batch Processing
- Xây dựng real-time pipeline với Kafka
- Windowing, Aggregation patterns
- Exactly-once semantics
1. Stream Processing Là Gì?
📌 Batch vs Stream:
• Batch: Thu thập data → xử lý sau (hàng giờ/ngày). VD: báo cáo hàng ngày
• Stream: Xử lý real-time khi data đến. VD: fraud detection, live dashboard
• Batch: Thu thập data → xử lý sau (hàng giờ/ngày). VD: báo cáo hàng ngày
• Stream: Xử lý real-time khi data đến. VD: fraud detection, live dashboard
Stream Processing Pipeline:
Source (Kafka) → Transform → Aggregate → Sink (Kafka/DB)
VD: E-commerce real-time analytics
┌─────────┐ ┌───────────┐ ┌────────────┐ ┌──────────┐
│ orders │───▶│ Filter │───▶│ Aggregate │───▶│ metrics │
│ topic │ │ (> 100K) │ │ (per hour) │ │ topic │
└─────────┘ └───────────┘ └────────────┘ └──────────┘
2. Real-time Pipeline với Node.js
// stream-processor.js
const { Kafka } = require('kafkajs');
const kafka = new Kafka({ clientId: 'stream-processor', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'stream-processors' });
const producer = kafka.producer();
// Windowed aggregation state
const windows = new Map(); // windowKey → { count, total }
const WINDOW_SIZE = 60000; // 1 phút
async function processStream() {
await consumer.connect();
await producer.connect();
await consumer.subscribe({ topic: 'orders' });
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
// 1. Filter: chỉ xử lý orders > 100K
if (order.total < 100000) return;
// 2. Transform: enrich data
const enriched = {
...order,
category: categorize(order.total),
processedAt: new Date().toISOString()
};
// 3. Windowed aggregation
const windowKey = Math.floor(Date.now() / WINDOW_SIZE);
if (!windows.has(windowKey)) {
windows.set(windowKey, { count: 0, total: 0 });
}
const window = windows.get(windowKey);
window.count++;
window.total += order.total;
// 4. Emit to output topic
await producer.send({
topic: 'order-metrics',
messages: [{
key: `window-${windowKey}`,
value: JSON.stringify({
window: windowKey,
orderCount: window.count,
totalRevenue: window.total,
avgOrderValue: window.total / window.count
})
}]
});
}
});
}
function categorize(total) {
if (total >= 10000000) return 'premium';
if (total >= 1000000) return 'standard';
return 'basic';
}
processStream();
3. Event Enrichment Pattern
// Enrich events bằng cách join với data khác
const userCache = new Map();
async function enrichOrder(order) {
// Lookup user from cache/DB
let user = userCache.get(order.userId);
if (!user) {
user = await fetchUserFromDB(order.userId);
userCache.set(order.userId, user);
}
return {
...order,
userName: user.name,
userTier: user.tier,
userCity: user.city
};
}
// Stream: orders → enriched-orders
await consumer.run({
eachMessage: async ({ message }) => {
const order = JSON.parse(message.value.toString());
const enriched = await enrichOrder(order);
await producer.send({
topic: 'enriched-orders',
messages: [{
key: message.key?.toString(),
value: JSON.stringify(enriched)
}]
});
}
});
4. Exactly-Once Semantics
// Kafka Transactions: đảm bảo exactly-once
const producer = kafka.producer({
transactionalId: 'my-transactional-producer',
maxInFlightRequests: 1,
idempotent: true
});
await producer.connect();
const transaction = await producer.transaction();
try {
// Gửi messages trong transaction
await transaction.send({
topic: 'output-topic',
messages: [{ value: 'processed data' }]
});
// Commit consumer offset trong cùng transaction
await transaction.sendOffsets({
consumerGroupId: 'my-group',
topics: [{
topic: 'input-topic',
partitions: [{ partition: 0, offset: '42' }]
}]
});
await transaction.commit();
} catch (err) {
await transaction.abort();
throw err;
}
💡 Exactly-once use cases:
• Financial transactions
• Inventory management
• Critical event processing mà không thể chấp nhận duplicate
• Financial transactions
• Inventory management
• Critical event processing mà không thể chấp nhận duplicate
📝 Tóm Tắt
- Stream processing: Xử lý data real-time khi nó đến
- Pipeline: Source → Transform → Aggregate → Sink
- Windowing: Aggregate data theo time windows
- Enrichment: Join stream data với external data
- Exactly-once: Kafka transactions cho critical data