-
This article is for quick start, refer to templates and common configs for more details.
Refer to pg to pg
- tested versions: 2.5.4 to 3.2.11
docker run -itd --name some-starrocks \
-p 9030:9030 \
-p 8030:8030 \
-p 8040:8040 \
"$STARROCKS_IMAGE"
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
CREATE SCHEMA test_db;
CREATE TABLE test_db.tb_1(id int, value int, primary key(id));
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
extract_type=struct
db_type=pg
url=postgres://postgres:[email protected]:5433/postgres?options[statement_timeout]=10s
[sinker]
url=mysql://root:@127.0.0.1:9030
sink_type=struct
db_type=starrocks
[filter]
do_dbs=test_db
[parallelizer]
parallel_type=serial
[pipeline]
buffer_size=100
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SHOW CREATE TABLE test_db.tb_1;
CREATE TABLE `tb_1` (
`id` int(11) NOT NULL COMMENT "",
`value` int(11) NULL COMMENT "",
`_ape_dts_is_deleted` boolean NULL COMMENT "",
`_ape_dts_timestamp` bigint(20) NULL COMMENT ""
) ENGINE=OLAP
PRIMARY KEY(`id`)
DISTRIBUTED BY HASH(`id`)
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"enable_persistent_index" = "true",
"replicated_storage" = "true",
"compression" = "LZ4"
);
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
INSERT INTO test_db.tb_1 VALUES(1,1),(2,2),(3,3),(4,4);
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=pg
extract_type=snapshot
url=postgres://postgres:[email protected]:5433/postgres?options[statement_timeout]=10s
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000
[filter]
do_dbs=test_db
do_events=insert
[parallelizer]
parallel_type=snapshot
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+-------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+-------+---------------------+--------------------+
| 1 | 1 | NULL | 1731665154965 |
| 2 | 2 | NULL | 1731665159858 |
| 3 | 3 | NULL | 1731665159880 |
| 4 | 4 | NULL | 1731665159880 |
+------+-------+---------------------+--------------------+
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
SELECT pg_drop_replication_slot('ape_test') FROM pg_replication_slots WHERE slot_name = 'ape_test';
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=pg
extract_type=cdc
url=postgres://postgres:[email protected]:5433/postgres?options[statement_timeout]=10s
slot_name=ape_test
[filter]
do_dbs=test_db
do_events=insert,update,delete
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
hard_delete=true
batch_size=5000
[parallelizer]
parallel_type=rdb_merge
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
DELETE FROM test_db.tb_1 WHERE id=1;
UPDATE test_db.tb_1 SET value=2000000 WHERE id=2;
INSERT INTO test_db.tb_1 VALUES(5,5);
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
| 2 | 2000000 | NULL | 1731665679461 |
| 3 | 3 | NULL | 1731665609225 |
| 4 | 4 | NULL | 1731665609236 |
| 5 | 5 | NULL | 1731665679572 |
+------+---------+---------------------+--------------------+
cat <<EOL > /tmp/ape_dts/task_config.ini
[extractor]
db_type=pg
extract_type=cdc
url=postgres://postgres:[email protected]:5433/postgres?options[statement_timeout]=10s
slot_name=ape_test
[filter]
do_dbs=test_db
do_events=insert,update,delete
[sinker]
db_type=starrocks
sink_type=write
url=mysql://root:@127.0.0.1:9030
stream_load_url=mysql://root:@127.0.0.1:8040
batch_size=5000
[parallelizer]
parallel_type=table
parallel_size=8
[pipeline]
buffer_size=16000
checkpoint_interval_secs=1
EOL
docker run --rm --network host \
-v "/tmp/ape_dts/task_config.ini:/task_config.ini" \
"$APE_DTS_IMAGE" /task_config.ini
psql -h 127.0.0.1 -U postgres -d postgres -p 5433 -W
DELETE FROM test_db.tb_1 WHERE id=3;
mysql -P 9030 -h 127.0.0.1 -u root --prompt="StarRocks > "
SELECT * FROM test_db.tb_1;
+------+---------+---------------------+--------------------+
| id | value | _ape_dts_is_deleted | _ape_dts_timestamp |
+------+---------+---------------------+--------------------+
| 2 | 2000000 | NULL | 1731665679461 |
| 3 | 3 | 1 | 1731665747381 |
| 4 | 4 | NULL | 1731665609236 |
| 5 | 5 | NULL | 1731665679572 |
+------+---------+---------------------+--------------------+
Refer to mysql to starrocks
- Create a table in Postgres
CREATE SCHEMA test_db;
CREATE TABLE test_db.full_column_type (
id serial PRIMARY KEY,
-- char
char_col char,
char_col_2 char(255),
character_col character,
character_col_2 character(255),
-- varchar
varchar_col varchar,
varchar_col_2 varchar(255),
character_varying_col character varying,
character_varying_col_2 character varying(255),
bpchar_col bpchar,
bpchar_col_2 bpchar(10),
text_col text,
-- float
real_col real,
float4_col float4,
-- double
double_precision_col double precision,
float8_col float8,
-- decimal
numeric_col numeric,
numeric_col_2 numeric(10, 2),
decimal_col decimal,
decimal_col_2 decimal(10, 2),
-- small int
smallint_col smallint,
int2_col int2,
smallserial_col smallserial,
serial2_col smallserial,
-- int
integer_col integer,
int_col int,
int4_col int,
serial_col serial,
serial4_col serial4,
-- bigint
bigint_col bigint,
int8_col int8,
bigserial_col bigserial,
serial8_col serial8,
-- bit
bit_col bit,
bit_col_2 bit(10),
bit_varying_col bit varying,
bit_varying_col_2 bit varying(10),
varbit_col varbit,
varbit_col_2 varbit(10),
-- time
time_col time,
time_col_2 time(6),
time_col_3 time without time zone,
time_col_4 time(6) without time zone,
-- timez
timez_col timetz,
timez_col_2 timetz(6),
timez_col_3 time with time zone,
timez_col_4 time(6) with time zone,
-- timestamp
timestamp_col timestamp,
timestamp_col_2 timestamp(6),
timestamp_col_3 timestamp without time zone,
timestamp_col_4 timestamp(6) without time zone,
-- timestampz
timestampz_col timestamptz,
timestampz_col_2 timestamptz(6),
timestampz_col_3 timestamp with time zone,
timestampz_col_4 timestamp(6) with time zone,
date_col date,
bytea_col bytea,
-- bool
boolean_col boolean,
bool_col bool,
-- json
json_col json,
jsonb_col jsonb,
-- interval
interval_col interval,
interval_col_2 interval(3),
-- array
array_float4_col float4[],
array_float8_col float8[],
array_int2_col int2[],
array_int4_col int4[],
array_int8_col bigint[],
array_int8_col_2 int8[],
array_text_col text[],
array_boolean_col boolean[],
array_boolean_col_2 bool[],
array_date_col date[],
array_timestamp_col timestamp[],
array_timestamp_col_2 timestamp(6)[],
array_timestamptz_col timestamptz[],
array_timestamptz_col_2 timestamptz(6)[],
-- others
box_col box,
cidr_col cidr,
circle_col circle,
inet_col inet,
line_col line,
lseg_col lseg,
macaddr_col macaddr,
macaddr8_col macaddr8,
money_col money,
path_col path,
pg_lsn_col pg_lsn,
pg_snapshot_col pg_snapshot,
polygon_col polygon,
point_col point,
tsquery_col tsquery,
tsvector_col tsvector,
txid_snapshot_col txid_snapshot,
uuid_col uuid,
xml_col xml
);
- The generated sql to be executed in StarRocks when migrate structures by ape_dts:
CREATE TABLE IF NOT EXISTS `test_db`.`full_column_type` (
`id` INT NOT NULL,
`char_col` STRING,
`char_col_2` STRING,
`character_col` STRING,
`character_col_2` STRING,
`varchar_col` STRING,
`varchar_col_2` STRING,
`character_varying_col` STRING,
`character_varying_col_2` STRING,
`bpchar_col` STRING,
`bpchar_col_2` STRING,
`text_col` STRING,
`real_col` FLOAT,
`float4_col` FLOAT,
`double_precision_col` DOUBLE,
`float8_col` DOUBLE,
`numeric_col` DECIMAL(38, 9),
`numeric_col_2` DECIMAL(38, 9),
`decimal_col` DECIMAL(38, 9),
`decimal_col_2` DECIMAL(38, 9),
`smallint_col` SMALLINT,
`int2_col` SMALLINT,
`smallserial_col` SMALLINT NOT NULL,
`serial2_col` SMALLINT NOT NULL,
`integer_col` INT,
`int_col` INT,
`int4_col` INT,
`serial_col` INT NOT NULL,
`serial4_col` INT NOT NULL,
`bigint_col` BIGINT,
`int8_col` BIGINT,
`bigserial_col` BIGINT NOT NULL,
`serial8_col` BIGINT NOT NULL,
`bit_col` STRING,
`bit_col_2` STRING,
`bit_varying_col` STRING,
`bit_varying_col_2` STRING,
`varbit_col` STRING,
`varbit_col_2` STRING,
`time_col` VARCHAR(255),
`time_col_2` VARCHAR(255),
`time_col_3` VARCHAR(255),
`time_col_4` VARCHAR(255),
`timez_col` VARCHAR(255),
`timez_col_2` VARCHAR(255),
`timez_col_3` VARCHAR(255),
`timez_col_4` VARCHAR(255),
`timestamp_col` DATETIME,
`timestamp_col_2` DATETIME,
`timestamp_col_3` DATETIME,
`timestamp_col_4` DATETIME,
`timestampz_col` DATETIME,
`timestampz_col_2` DATETIME,
`timestampz_col_3` DATETIME,
`timestampz_col_4` DATETIME,
`date_col` DATE,
`bytea_col` VARBINARY,
`boolean_col` BOOLEAN,
`bool_col` BOOLEAN,
`json_col` JSON,
`jsonb_col` JSON,
`interval_col` VARCHAR(255),
`interval_col_2` VARCHAR(255),
`array_float4_col` STRING,
`array_float8_col` STRING,
`array_int2_col` STRING,
`array_int4_col` STRING,
`array_int8_col` STRING,
`array_int8_col_2` STRING,
`array_text_col` STRING,
`array_boolean_col` STRING,
`array_boolean_col_2` STRING,
`array_date_col` STRING,
`array_timestamp_col` STRING,
`array_timestamp_col_2` STRING,
`array_timestamptz_col` STRING,
`array_timestamptz_col_2` STRING,
`box_col` STRING,
`cidr_col` STRING,
`circle_col` STRING,
`inet_col` STRING,
`line_col` STRING,
`lseg_col` STRING,
`macaddr_col` STRING,
`macaddr8_col` STRING,
`money_col` STRING,
`path_col` STRING,
`pg_lsn_col` STRING,
`pg_snapshot_col` STRING,
`polygon_col` STRING,
`point_col` STRING,
`tsquery_col` STRING,
`tsvector_col` STRING,
`txid_snapshot_col` STRING,
`uuid_col` STRING,
`xml_col` STRING,
`_ape_dts_is_deleted` BOOLEAN,
`_ape_dts_timestamp` BIGINT
) PRIMARY KEY (`id`) DISTRIBUTED BY HASH(`id`);
Refer to mysql to starrocks
Refer to mysql to starrocks
Currently, DDL events are ignored, we may support this in future.