← Về danh sách bài họcBài 18/20

🔌 Bài 18: Kafka Connect

⏱️ Thời gian đọc: 22 phút | 📚 Độ khó: Nâng cao

🎯 Sau bài học này, bạn sẽ:

1. Kafka Connect Là Gì?

Kafka Connect Pipeline:

   Source Connector           Kafka            Sink Connector
┌──────────────────┐    ┌──────────┐    ┌──────────────────┐
│     MySQL        │───▶│  Topics  │───▶│  Elasticsearch   │
│     MongoDB      │    │          │    │  PostgreSQL      │
│     S3           │    │          │    │  Redis           │
│     REST API     │    │          │    │  S3              │
└──────────────────┘    └──────────┘    └──────────────────┘

• Không cần viết code producer/consumer
• Chỉ cần cấu hình JSON
• Hỗ trợ distributed mode

2. Docker Setup

# docker-compose.yml
version: '3.8'
services:
  kafka:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'

  kafka-connect:
    image: confluentinc/cp-kafka-connect:7.5.0
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: kafka:9092
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: kafka-connect
      CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: connect-status
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
    depends_on:
      - kafka

  mysql:
    image: mysql:8
    environment:
      MYSQL_ROOT_PASSWORD: root
      MYSQL_DATABASE: myapp
    ports:
      - "3306:3306"

3. Source Connector (MySQL → Kafka)

# Tạo Source Connector qua REST API
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "mysql-source",
    "config": {
        "connector.class": "io.debezium.connector.mysql.MySqlConnector",
        "database.hostname": "mysql",
        "database.port": "3306",
        "database.user": "root",
        "database.password": "root",
        "database.server.id": "1",
        "topic.prefix": "myapp",
        "database.include.list": "myapp",
        "table.include.list": "myapp.orders,myapp.users",
        "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
        "schema.history.internal.kafka.topic": "schema-changes"
    }
}'
📌 Debezium (CDC): Capture tất cả INSERT, UPDATE, DELETE từ MySQL → Kafka topics. Mỗi table → 1 topic (VD: myapp.orders)

4. Sink Connector (Kafka → Elasticsearch)

# Tạo Sink Connector
curl -X POST http://localhost:8083/connectors \
  -H "Content-Type: application/json" \
  -d '{
    "name": "elasticsearch-sink",
    "config": {
        "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
        "topics": "myapp.orders",
        "connection.url": "http://elasticsearch:9200",
        "type.name": "_doc",
        "key.ignore": "true",
        "schema.ignore": "true",
        "transforms": "extractValue",
        "transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.extractValue.field": "after"
    }
}'

5. Quản Lý Connectors

# Liệt kê connectors
curl http://localhost:8083/connectors

# Xem status
curl http://localhost:8083/connectors/mysql-source/status

# Pause connector
curl -X PUT http://localhost:8083/connectors/mysql-source/pause

# Resume connector
curl -X PUT http://localhost:8083/connectors/mysql-source/resume

# Delete connector
curl -X DELETE http://localhost:8083/connectors/mysql-source

# Xem available connector plugins
curl http://localhost:8083/connector-plugins

📝 Tóm Tắt