← Về danh sách bài họcBài 13/20

🦅 Bài 13: Apache Kafka - Giới Thiệu & Cài Đặt

⏱️ Thời gian đọc: 22 phút | 📚 Độ khó: Trung bình

🎯 Sau bài học này, bạn sẽ:

1. Kafka Là Gì?

Apache Kafka là một distributed event streaming platform được phát triển bởi LinkedIn, sau đó open-source. Kafka không phải là message queue truyền thống — nó là distributed commit log.

📌 Kafka khác Message Queue truyền thống:
• MQ: Broker xóa message sau khi consumer xử lý
• Kafka: Giữ lại message trên disk theo retention policy (mặc định 7 ngày)
• Kafka cho phép replay message bằng cách reset offset
• Consumer tự quản lý vị trí đọc (offset)

2. Kiến Trúc Core

┌─────────────────────────────────────────────────────┐
│                  Kafka Cluster                       │
│                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │  Broker 1   │  │  Broker 2   │  │  Broker 3   │ │
│  │             │  │             │  │             │ │
│  │ Topic: logs │  │ Topic: logs │  │ Topic: logs │ │
│  │ Partition 0 │  │ Partition 1 │  │ Partition 2 │ │
│  │ (Leader)    │  │ (Leader)    │  │ (Leader)    │ │
│  │ Partition 1 │  │ Partition 2 │  │ Partition 0 │ │
│  │ (Replica)   │  │ (Replica)   │  │ (Replica)   │ │
│  └─────────────┘  └─────────────┘  └─────────────┘ │
└─────────────────────────────────────────────────────┘

Topic "logs" với 3 partitions:
Partition 0: [msg0, msg3, msg6, msg9, ...]
Partition 1: [msg1, msg4, msg7, msg10, ...]
Partition 2: [msg2, msg5, msg8, msg11, ...]

Mỗi partition là một append-only log với offset tăng dần

3. Cài Đặt Với Docker (KRaft Mode)

Từ Kafka 3.3+, bạn có thể chạy KRaft mode — không cần ZooKeeper!

# docker-compose.yml
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    hostname: kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_LOG_RETENTION_HOURS: 168
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
    volumes:
      - kafka_data:/var/lib/kafka/data

  kafka-ui:
    image: provectuslabs/kafka-ui:latest
    ports:
      - "8080:8080"
    environment:
      KAFKA_CLUSTERS_0_NAME: local
      KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS: kafka:9092
    depends_on:
      - kafka

volumes:
  kafka_data:
# Khởi động
docker-compose up -d

# Kafka UI: http://localhost:8080

4. Kafka CLI Tools

# Tạo topic
docker exec kafka kafka-topics \
  --create \
  --topic my-events \
  --partitions 3 \
  --replication-factor 1 \
  --bootstrap-server localhost:9092

# Liệt kê topics
docker exec kafka kafka-topics \
  --list \
  --bootstrap-server localhost:9092

# Mô tả topic
docker exec kafka kafka-topics \
  --describe \
  --topic my-events \
  --bootstrap-server localhost:9092

# Produce messages
docker exec -it kafka kafka-console-producer \
  --topic my-events \
  --bootstrap-server localhost:9092
# > Hello Kafka!
# > Message 2

# Consume messages (từ đầu)
docker exec kafka kafka-console-consumer \
  --topic my-events \
  --from-beginning \
  --bootstrap-server localhost:9092

5. Hello World với Node.js

npm install kafkajs
// producer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
});

const producer = kafka.producer();

async function sendMessages() {
    await producer.connect();

    await producer.send({
        topic: 'my-events',
        messages: [
            { key: 'user-1', value: JSON.stringify({ action: 'login' }) },
            { key: 'user-2', value: JSON.stringify({ action: 'purchase' }) },
        ]
    });

    console.log('✅ Messages sent!');
    await producer.disconnect();
}

sendMessages();
// consumer.js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
    clientId: 'my-app',
    brokers: ['localhost:9092']
});

const consumer = kafka.consumer({ groupId: 'my-group' });

async function consume() {
    await consumer.connect();
    await consumer.subscribe({ topic: 'my-events', fromBeginning: true });

    await consumer.run({
        eachMessage: async ({ topic, partition, message }) => {
            console.log({
                topic,
                partition,
                offset: message.offset,
                key: message.key?.toString(),
                value: message.value.toString()
            });
        }
    });
}

consume();

📝 Tóm Tắt