Flink CDC is a group of source CDC connectors for Apache Flink. It can read historical data and incremental data in real time from most databases. Flink CDC can synchronize the full data and incremental data of a database to a message queue product or data warehouse. Flink CDC can be used for real-time data integration. You can use it to import the data of a database to a data lake or warehouse in real time. Flink CDC also supports data processing. You can use the SQL client of Flink CDC to associate, widen, and aggregate data in a database in real time, and write the result to various storage media. Change Data Capture (CDC) can help you monitor and capture the changes in a database. You can do many things with data provided by the CDC. For example, you can use the data to make historical databases or do near real-time caching. You can also provide the CDC data to message queues (MQs), so you can use MQs for analysis and auditing.
This topic describes how to use Flink CDC to migrate data from OceanBase Database to a MySQL database.
Prepare the installation environment
Configure the oblogproxy service for OceanBase Database
oblogproxy is a proxy service for managing incremental logs of OceanBase Database. oblogproxy allows applications to access and manage real-time incremental data logs of OceanBase Database. It also supports subscriptions to incremental logs in isolated networks.
In the sys tenant of the OceanBase cluster, create a username with a password for oblogproxy.
Here is an example:
obclient [(none)]> SHOW TENANT; +---------------------+ | Current_tenant_name | +---------------------+ | sys | +---------------------+ 1 row in set obclient [(none)]> CREATE USER sys_user001 IDENTIFIED BY '******'; Query OK, 0 rows affected obclient [(none)]> GRANT ALL PRIVILEGES ON *.* TO sys_user001 WITH GRANT OPTION; Query OK, 0 rows affectedInstall and deploy oblogproxy.
Configure the Flink environment
Download Flink and the required dependency packages.
Download Flink 1.15.3 and decompress it to the
flink-1.15.3directory.Download the following dependency packages and place them in the
flink-1.15.3/lib/directory.
Prepare data
Prepare data in the OceanBase database
Prepare test data in the OceanBase database as the source data to be imported to the MySQL database.
Log on to the OceanBase database.
[xxx@xxx /home/admin] $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p -A Enter password: Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>Create a database named
test_ob_to_mysql. Then, create thetbl1andtbl2tables and insert data in the tables.obclient [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected obclient [(none)]> USE test_ob_to_mysql; Database changed obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
Prepare data in the MySQL database
Create a table for storing the source data in the MySQL database.
Go to the MySQL database.
[xxx@xxx /home/admin] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p Enter password: <Omit echo information> MySQL [(none)]>Create a database named
test_ob_to_mysqland a table namedob_tbl1_and_tbl2.MySQL [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected MySQL [(none)]> USE test_ob_to_mysql; Database changed MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
Start the Flink cluster and Flink SQL CLI
Run the following command to jump to the home directory of Flink.
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3Run the following command to start the Flink cluster.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.shAfter the Flink cluster is started, you can access the web UI of Flink at
http://localhost:8081/, as shown in the following figure.
Note
After you run the
./bin/start-cluster.shcommand, ifbash: ./bin/start-cluster.sh: Permission deniedis displayed, you must change the privileges on all files in theflink-1.15.3directory from-rw-rw-r--to-rwxrwxrwx.Here is an example:
[xxx@xxx /.../flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*Run the following command to start Flink SQL CLI.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.shAfter the CLI is started, the page shown in the following figure appears.

Enable checkpointing
On the Flink SQL CLI, enable checkpointing and set the checkpointing interval to 3s.
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Create OceanBase CDC tables
On the Flink SQL CLI, create tables corresponding to OceanBase Database. On the Flink SQL CLI, create the tbl1 and tbl2 tables in the test_ob_to_mysql OceanBase database to synchronize the data in the tables.
Flink SQL> CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl1',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl2',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
For more information about OceanBase CDC WITH, see OceanBase CDC Connector.
Create MySQL CDC tables
On the Flink SQL CLI, create tables corresponding to the MySQL database. Create the ob_tbl1_and_tbl2 table to write the synchronized data to the MySQL database.
Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
'username' = 'root',
'password' = '******',
'table-name' = 'ob_tbl1_and_tbl2');
[INFO] Execute statement succeed.
For more information about JDBC SQL Connector WITH, see JDBC SQL Connector.
Write the data to the MySQL database on the Flink SQL CLI
Use Flink SQL to associate the tbl1 and tbl2 tables and write the associated data to the MySQL database.
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM ob_tbl1 t1,ob_tbl2 t2
WHERE t1.col3=t2.col1;
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
> SELECT t1.col1,t1.col3,t1.col2,t2.col2
> FROM ob_tbl1 t1,ob_tbl2 t2
> WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
Note
In this example, the version for the MySQL driver (com.mysql.jdbc.Driver) used is MySQL Connector/J 5.1.47. For a new MySQL driver (com.mysql.cj.jdbc.Driver), use MySQL Connector/J 8.x.
Check the associated data written to the MySQL database
Log on to the MySQL database and check the data of the ob_tbl1_and_tbl2 table in the test_ob_to_mysql database.
MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set