Skip to main content

LakeSoul Flink CDC Whole Database Synchronization Tutorial

LakeSoul Flink CDC Sink supports the entire database synchronization from MySQL to LakeSoul, and can support automatic table creation, automatic schema change, exactly once semantics, etc.

For detailed documentation, please refer to LakeSoul Flink CDC Synchronization of Entire MySQL Database

In this tutorial, we fully demonstrate synchronizing a MySQL database to LakeSoul, including automatic table creation, DDL changes and other operations.

1. Prepare the environment

1.1 Start a local MySQL database

It is recommended to use the MySQL Docker image to quickly start a MySQL database instance:

docker run --name lakesoul-test-mysql -e MYSQL_ROOT_PASSWORD=root -e MYSQL_DATABASE=test_cdc -p 3306:3306 -d mysql:8

1.2 Configuring LakeSoul Meta DB and Spark Environment

For this part, please refer to Setup a local test environment

Then start a spark-sql SQL interactive query command line environment:

$SPARK_HOME/bin/spark-sql --conf spark.sql.extensions=com.dmetasoul.lakesoul.sql.LakeSoulSparkSessionExtension --conf spark.sql.catalog.lakesoul=org.apache.spark.sql.lakesoul.catalog.LakeSoulCatalog --conf spark.sql.defaultCatalog=lakesoul --conf spark.sql.warehouse.dir=/tmp/lakesoul --conf spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10
tip

This command starts a Spark local job, adding two options:

  1. spark.sql.warehouse.dir=/tmp/lakesoul This parameter is set because the default table storage location in Spark SQL needs to be set to the same directory as the Flink job output directory.
  2. spark.dmetasoul.lakesoul.snapshot.cache.expire.seconds=10 This parameter is set because LakeSoul caches metadata information in Spark, setting a smaller cache expiration time to facilitate querying the latest data.

After starting the Spark SQL command line, you can execute:

SHOW DATABASES;
SHOW TABLES IN default;

You can see that there is currently only one default database in LakeSoul, and there are no tables in it.

1.3 Create a table in MySQL in advance and write data

  1. Install mycli
    pip install mycli
  2. Start mycli and connect to the MySQL database
    mycli mysql://root@localhost:3306/test_cdc -p root
  3. Create table and write data
    CREATE TABLE mysql_test_1 (id INT PRIMARY KEY, name VARCHAR(255), type SMALLINT);
    INSERT INTO mysql_test_1 VALUES (1, 'Bob', 10);
    SELECT * FROM mysql_test_1;

2. Start the sync job

You can download from the Flink download page: Flink 1.17

Unzip the downloaded Flink installation package:

tar xf flink-1.17.1-bin-scala_2.12.tgz
export FLINK_HOME=${PWD}/flink-1.17.1

Then start a local Flink Cluster:

$FLINK_HOME/bin/start-cluster.sh

You can open http://localhost:8081 to see if the Flink local cluster has started normally:

Submit a LakeSoul Flink CDC Sink job to the Flink cluster started above:

./bin/flink run -ys 1 -yjm 1G -ytm 2G \
-c org.apache.flink.lakesoul.entry.MysqlCdc\
lakesoul-flink-2.5.0-flink-1.17.jar \
--source_db.host localhost \
--source_db.port 3306 \
--source_db.db_name test_cdc \
--source_db.user root \
--source_db.password root \
--source.parallelism 1 \
--sink.parallelism 1 \
--warehouse_path file:/tmp/lakesoul \
--flink.checkpoint file:/tmp/flink/chk \
--flink.savepoint file:/tmp/flink/svp \
--job.checkpoint_interval 10000 \
--server_time_zone UTC

The jar package of lakesoul-flink can be downloaded from the Github Release page.

Refer to LakeSoul Flink CDC Synchronization of Entire MySQL Database for detailed usage of the Flink job.

On the http://localhost:8081 Flink job page, click Running Job to check whether the LakeSoul job is already in the Running state.

You can click to enter the job page, and you should see that one data record has been synchronized:

2.3 Use Spark SQL to read the synchronized data in the LakeSoul table

Execute in Spark SQL Shell:

SHOW DATABASES;
SHOW TABLES IN test_cdc;
DESC test_cdc.mysql_test_1;
SELECT * FROM test_cdc.mysql_test_1;

You can see the running result of each statement, that is, a test_cdc database is automatically created in LakeSoul, and a mysql_test_1 table is automatically created. The fields and primary keys of the table are the same as those of MySQL (one more rowKinds column, refer to the description in LakeSoul CDC Table).

2.4 Observe the synchronization situation after executing Update in MySQL

Perform the update in mycli:

UPDATE mysql_test_1 SET name='Peter' WHERE id=1;

Then read again in LakeSoul:

SELECT * FROM test_cdc.mysql_test_1;

You can see that the latest data has been read:

2.5 Observe the synchronization after executing DDL in MySQL, and read new and old data

Modify the structure of the table in mycli:

ALTER TABLE mysql_test_1 ADD COLUMN new_col FLOAT;

That is to add a new column at the end, the default is null. Verify the execution result in mycli:

At this point, the table structure has been synchronized in LakeSoul, and we can view the table structure in spark-sql:

DESC test_cdc.mysql_test_1;

At this time, read data from LakeSoul, and the new column is also null:

SELECT * FROM test_cdc.mysql_test_1;

Insert a new piece of data into MySQL:

INSERT INTO mysql_test_1 VALUES (2, 'Alice', 20, 9.9);

Read again from LakeSoul:

Delete a piece of data from MySQL:

delete from mysql_test_1 where id=1;

Read from LakeSoul:

You can see that LakeSoul reads the synchronized result every time, which is exactly the same as in MySQL.

2.6 Observe the synchronization after creating a new table in MySQL

Create a new table in MySQL with a different schema from the previous table:

CREATE TABLE mysql_test_2 (name VARCHAR(100) PRIMARY KEY, phone_no VARCHAR(20));

In LakeSoul, you can see that the new table has been automatically created, and you can view the table structure:

Insert a piece of data into a new MySQL table:

INSERT INTO mysql_test_2 VALUES ('Bob', '10010');

LakeSoul also successfully synchronized and read the data of the new table: