← Về danh sách bài họcBài 20/20

🚀 Bài 20: Dự Án - Real-time E-commerce Analytics

⏱️ Thời gian đọc: 35 phút | 📚 Độ khó: Dự án

🎯 Dự án cuối khóa - Bạn sẽ xây dựng:

1. Kiến Trúc Hệ Thống

┌─────────────────────────────────────────────────────────────┐
│                  E-commerce Analytics Platform               │
│                                                             │
│  ┌────────────┐    ┌──────────┐    ┌──────────────────────┐ │
│  │ Order      │───▶│  Kafka   │───▶│ Stream Processors    │ │
│  │ Simulator  │    │          │    │                      │ │
│  │ (Producer) │    │ Topics:  │    │ • Revenue Calculator │ │
│  └────────────┘    │ • orders │    │ • Fraud Detector     │ │
│                    │ • metrics│    │ • Trend Analyzer     │ │
│  ┌────────────┐    │ • alerts │    └──────────┬───────────┘ │
│  │ Dashboard  │◀───│          │               │             │
│  │ API        │    └──────────┘    ┌──────────▼───────────┐ │
│  │ (Express + │                    │ Redis (Real-time     │ │
│  │  WebSocket)│◀───────────────────│  State Store)        │ │
│  └────────────┘                    └──────────────────────┘ │
└─────────────────────────────────────────────────────────────┘

2. Docker Compose

# docker-compose.yml
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092

3. Order Simulator (Producer)

// order-simulator.js
const { Kafka } = require('kafkajs');
const crypto = require('crypto');

const kafka = new Kafka({ clientId: 'order-simulator', brokers: ['localhost:9092'] });
const producer = kafka.producer();

const products = [
    { name: 'Laptop', price: 25000000 },
    { name: 'Điện thoại', price: 15000000 },
    { name: 'Tai nghe', price: 2000000 },
    { name: 'Chuột gaming', price: 800000 },
    { name: 'Bàn phím', price: 1500000 },
    { name: 'Màn hình', price: 8000000 },
];

const cities = ['HCM', 'Hà Nội', 'Đà Nẵng', 'Cần Thơ', 'Hải Phòng'];

function generateOrder() {
    const product = products[Math.floor(Math.random() * products.length)];
    const quantity = Math.floor(Math.random() * 3) + 1;

    return {
        orderId: crypto.randomUUID(),
        userId: `user-${Math.floor(Math.random() * 1000)}`,
        product: product.name,
        quantity,
        unitPrice: product.price,
        total: product.price * quantity,
        city: cities[Math.floor(Math.random() * cities.length)],
        timestamp: new Date().toISOString(),
        // 5% chance fraud (total > 50M)
        isSuspicious: Math.random() < 0.05
    };
}

async function simulate() {
    await producer.connect();
    console.log('🛒 Order Simulator started...');

    setInterval(async () => {
        const order = generateOrder();

        // Fraud: tạo order giá trị cực cao
        if (order.isSuspicious) {
            order.total = 100000000 + Math.floor(Math.random() * 50000000);
        }

        await producer.send({
            topic: 'orders',
            messages: [{
                key: order.userId,
                value: JSON.stringify(order),
                headers: { source: 'web' }
            }]
        });

        console.log(`📦 Order: ${order.product} x${order.quantity} = ${(order.total/1000000).toFixed(1)}M - ${order.city}`);
    }, 500 + Math.random() * 1500); // 0.5-2s between orders
}

simulate();

4. Stream Processors

Revenue Calculator

// revenue-processor.js
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');

const kafka = new Kafka({ clientId: 'revenue-processor', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'revenue-calculators' });
const producer = kafka.producer();
const redis = new Redis();

async function processRevenue() {
    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topic: 'orders' });

    console.log('💰 Revenue Processor started...');

    await consumer.run({
        eachMessage: async ({ message }) => {
            const order = JSON.parse(message.value.toString());
            const hour = new Date(order.timestamp).toISOString().slice(0, 13);

            // Update Redis counters
            await redis.incrby(`revenue:total`, order.total);
            await redis.incrby(`revenue:hourly:${hour}`, order.total);
            await redis.incr(`orders:total`);
            await redis.incr(`orders:hourly:${hour}`);
            await redis.incrby(`revenue:city:${order.city}`, order.total);
            await redis.incr(`orders:product:${order.product}`);

            // Emit metrics to metrics topic
            const metrics = {
                type: 'revenue_update',
                totalRevenue: parseInt(await redis.get('revenue:total') || 0),
                totalOrders: parseInt(await redis.get('orders:total') || 0),
                hourlyRevenue: parseInt(await redis.get(`revenue:hourly:${hour}`) || 0),
                timestamp: new Date().toISOString()
            };

            await producer.send({
                topic: 'metrics',
                messages: [{ value: JSON.stringify(metrics) }]
            });
        }
    });
}

processRevenue();

Fraud Detector

// fraud-detector.js
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');

const kafka = new Kafka({ clientId: 'fraud-detector', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'fraud-detectors' });
const producer = kafka.producer();
const redis = new Redis();

const FRAUD_THRESHOLD = 50000000; // 50M
const MAX_ORDERS_PER_MINUTE = 10;

async function detectFraud() {
    await consumer.connect();
    await producer.connect();
    await consumer.subscribe({ topic: 'orders' });

    console.log('🔍 Fraud Detector started...');

    await consumer.run({
        eachMessage: async ({ message }) => {
            const order = JSON.parse(message.value.toString());
            const alerts = [];

            // Rule 1: High value order
            if (order.total > FRAUD_THRESHOLD) {
                alerts.push({
                    type: 'HIGH_VALUE',
                    message: `Order ${order.orderId}: ${(order.total/1000000).toFixed(1)}M exceeds threshold`,
                    severity: 'HIGH'
                });
            }

            // Rule 2: Too many orders from same user
            const minute = new Date().toISOString().slice(0, 16);
            const userKey = `orders:${order.userId}:${minute}`;
            const orderCount = await redis.incr(userKey);
            await redis.expire(userKey, 120);

            if (orderCount > MAX_ORDERS_PER_MINUTE) {
                alerts.push({
                    type: 'VELOCITY',
                    message: `User ${order.userId}: ${orderCount} orders in 1 minute`,
                    severity: 'CRITICAL'
                });
            }

            // Emit alerts
            for (const alert of alerts) {
                await producer.send({
                    topic: 'alerts',
                    messages: [{
                        key: order.userId,
                        value: JSON.stringify({
                            ...alert,
                            orderId: order.orderId,
                            userId: order.userId,
                            timestamp: new Date().toISOString()
                        })
                    }]
                });
                console.log(`🚨 ALERT [${alert.severity}]: ${alert.message}`);
            }
        }
    });
}

detectFraud();

5. Dashboard API

// dashboard-api.js
const express = require('express');
const { createServer } = require('http');
const { Server } = require('socket.io');
const { Kafka } = require('kafkajs');
const Redis = require('ioredis');

const app = express();
const httpServer = createServer(app);
const io = new Server(httpServer, { cors: { origin: '*' } });
const redis = new Redis();

const kafka = new Kafka({ clientId: 'dashboard', brokers: ['localhost:9092'] });
const consumer = kafka.consumer({ groupId: 'dashboard-consumers' });

// REST API: Get current stats
app.get('/api/stats', async (req, res) => {
    const stats = {
        totalRevenue: parseInt(await redis.get('revenue:total') || 0),
        totalOrders: parseInt(await redis.get('orders:total') || 0),
        avgOrderValue: 0
    };
    if (stats.totalOrders > 0) {
        stats.avgOrderValue = Math.round(stats.totalRevenue / stats.totalOrders);
    }
    res.json(stats);
});

// REST API: Revenue by city
app.get('/api/stats/cities', async (req, res) => {
    const cities = ['HCM', 'Hà Nội', 'Đà Nẵng', 'Cần Thơ', 'Hải Phòng'];
    const data = [];
    for (const city of cities) {
        data.push({
            city,
            revenue: parseInt(await redis.get(`revenue:city:${city}`) || 0)
        });
    }
    res.json(data.sort((a, b) => b.revenue - a.revenue));
});

// WebSocket: Real-time updates
async function startRealtimeStream() {
    await consumer.connect();
    await consumer.subscribe({ topics: ['metrics', 'alerts'] });

    await consumer.run({
        eachMessage: async ({ topic, message }) => {
            const data = JSON.parse(message.value.toString());
            io.emit(topic, data);
        }
    });
}

io.on('connection', (socket) => {
    console.log(`👤 Client connected: ${socket.id}`);
    socket.on('disconnect', () => console.log(`👋 Client disconnected`));
});

startRealtimeStream();
httpServer.listen(3001, () => console.log('📊 Dashboard API on port 3001'));

6. Chạy Dự Án

# 1. Khởi động infrastructure
docker-compose up -d

# 2. Tạo topics
docker exec kafka kafka-topics --create --topic orders --partitions 3 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic metrics --partitions 1 --bootstrap-server localhost:9092
docker exec kafka kafka-topics --create --topic alerts --partitions 1 --bootstrap-server localhost:9092

# 3. Start processors (mỗi terminal riêng)
node revenue-processor.js
node fraud-detector.js
node dashboard-api.js

# 4. Start order simulator
node order-simulator.js

# 5. Kiểm tra
curl http://localhost:3001/api/stats
curl http://localhost:3001/api/stats/cities

# Kafka UI: http://localhost:8080

7. Mở Rộng

💡 Bài tập nâng cao:
• Thêm React dashboard UI với real-time charts
• Implement Kafka Connect để sync data vào Elasticsearch
• Thêm email/SMS alerts khi phát hiện fraud
• Implement exactly-once processing với Kafka transactions
Scale: Chạy multiple instances của mỗi processor
• Thêm A/B testing stream processor

🎉 Hoàn Thành Khóa Học!

Chúc mừng bạn đã hoàn thành khóa học Message Queue, RabbitMQ & Kafka! Bạn đã có kiến thức nền tảng vững chắc về:

Tiếp tục thực hành và đừng ngại thử nghiệm trong các dự án thực tế! 🚀