← Về danh sách bài họcBài 18/20
🔌 Bài 18: Kafka Connect
🎯 Sau bài học này, bạn sẽ:
- Hiểu Kafka Connect architecture
- Cấu hình Source Connector (DB → Kafka)
- Cấu hình Sink Connector (Kafka → DB)
- Tích hợp MySQL, MongoDB, Elasticsearch
1. Kafka Connect Là Gì?
Kafka Connect Pipeline:
Source Connector Kafka Sink Connector
┌──────────────────┐ ┌──────────┐ ┌──────────────────┐
│ MySQL │───▶│ Topics │───▶│ Elasticsearch │
│ MongoDB │ │ │ │ PostgreSQL │
│ S3 │ │ │ │ Redis │
│ REST API │ │ │ │ S3 │
└──────────────────┘ └──────────┘ └──────────────────┘
• Không cần viết code producer/consumer
• Chỉ cần cấu hình JSON
• Hỗ trợ distributed mode
2. Docker Setup
# docker-compose.yml
version: '3.8'
services:
kafka:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_PROCESS_ROLES: broker,controller
KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka:29093
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:29093
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
CLUSTER_ID: 'MkU3OEVBNTcwNTJENDM2Qk'
kafka-connect:
image: confluentinc/cp-kafka-connect:7.5.0
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: kafka:9092
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect
CONNECT_CONFIG_STORAGE_TOPIC: connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: connect-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
depends_on:
- kafka
mysql:
image: mysql:8
environment:
MYSQL_ROOT_PASSWORD: root
MYSQL_DATABASE: myapp
ports:
- "3306:3306"
3. Source Connector (MySQL → Kafka)
# Tạo Source Connector qua REST API
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "mysql-source",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "root",
"database.password": "root",
"database.server.id": "1",
"topic.prefix": "myapp",
"database.include.list": "myapp",
"table.include.list": "myapp.orders,myapp.users",
"schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
"schema.history.internal.kafka.topic": "schema-changes"
}
}'
📌 Debezium (CDC): Capture tất cả INSERT, UPDATE, DELETE từ MySQL → Kafka topics. Mỗi
table → 1 topic (VD: myapp.orders)
4. Sink Connector (Kafka → Elasticsearch)
# Tạo Sink Connector
curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d '{
"name": "elasticsearch-sink",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"topics": "myapp.orders",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc",
"key.ignore": "true",
"schema.ignore": "true",
"transforms": "extractValue",
"transforms.extractValue.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractValue.field": "after"
}
}'
5. Quản Lý Connectors
# Liệt kê connectors
curl http://localhost:8083/connectors
# Xem status
curl http://localhost:8083/connectors/mysql-source/status
# Pause connector
curl -X PUT http://localhost:8083/connectors/mysql-source/pause
# Resume connector
curl -X PUT http://localhost:8083/connectors/mysql-source/resume
# Delete connector
curl -X DELETE http://localhost:8083/connectors/mysql-source
# Xem available connector plugins
curl http://localhost:8083/connector-plugins
📝 Tóm Tắt
- Kafka Connect: Framework tích hợp data không cần code
- Source Connector: External system → Kafka (VD: Debezium CDC)
- Sink Connector: Kafka → External system (VD: Elasticsearch)
- Quản lý qua REST API (port 8083)
- Hỗ trợ distributed mode cho HA