Refer to Send data to Kafka for consumers.
-
This article is for quick start, refer to templates and common configs for more details.
Refer to mysql to mysql
- start zookeeper
rm -rf /tmp/ape_dts/kafka/zookeeper_data
mkdir -p /tmp/ape_dts/kafka/zookeeper_data
docker run --name some-zookeeper \
-p 2181:2181 \
-v "/tmp/ape_dts/kafka/zookeeper_data:/bitnami" \
-e ALLOW_ANONYMOUS_LOGIN=yes \
-d "$ZOOKEEPER_IMAGE"
- start kafka
rm -rf /tmp/ape_dts/kafka/kafka_data
mkdir -p /tmp/ape_dts/kafka/kafka_data
docker run --name some-kafka \
-p 9092:9092 \
-p 9093:9093 \
-v "/tmp/ape_dts/kafka/kafka_data:/bitnami/kafka" \
-e KAFKA_CFG_ZOOKEEPER_CONNECT=host.docker.internal:2181 \
-e ALLOW_PLAINTEXT_LISTENER=yes \
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT \
-e KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093 \
-e KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://127.0.0.1:9092,EXTERNAL://127.0.0.1:9093 \
-e KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT \
-d "$KAFKA_IMAGE"
- create test topic
docker exec -it some-kafka /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic test --bootstrap-server localhost:9093
mysql -h127.0.0.1 -uroot -p123456 -P3307
CREATE DATABASE test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));
INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=snapshot
url=mysql://root:[email protected]:3307?ssl-mode=disabled
[sinker]
db_type=kafka
sink_type=write
url=127.0.0.1:9093
with_field_defs=true
[filter]
do_dbs=test_db
do_events=insert
[router]
topic_map=*.*:test
[parallelizer]
parallel_type=snapshot
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=mysql
extract_type=cdc
server_id=2000
url=mysql://root:[email protected]:3307?ssl-mode=disabled
[filter]
do_dbs=test_db,test_db_2
do_events=insert,update,delete
do_ddls=*
[router]
topic_map=*.*:test
[sinker]
db_type=kafka
sink_type=write
url=127.0.0.1:9093
with_field_defs=true
[parallelizer]
parallel_type=serial
parallel_size=1
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
mysql -h127.0.0.1 -uroot -p123456 -uroot -P3307
CREATE DATABASE test_db_2;
CREATE TABLE test_db_2.tb_2(id int, value int, primary key(id));
INSERT INTO test_db_2.tb_2 VALUES(1,1);
UPDATE test_db_2.tb_2 SET value=100000 WHERE id=1;
DELETE FROM test_db_2.tb_2;