CDC Connectors for Apache Flink (Flink CDC for short) is a set of source connectors for Apache Flink. Flink CDC can read historical data and incremental changes from most databases in real time. Flink CDC can synchronize full and incremental data of different databases to message queues and data warehouses. You can also use Flink CDC for real-time data integration, to import database data to a data lake or data warehouse in real time. Flink CDC also supports data processing. You can use the SQL client of Flink CDC to associate, widen, and aggregate database data in real time, and write the results to various stores. Change Data Capture (CDC) helps you monitor and capture changes in databases. You can do many things with data provided by CDC. For example, you can use the data to make historical databases or perform near real-time caching. You can also provide the CDC data to message queues (MQs), so you can use MQs for analysis and auditing.
This topic describes how to use Flink CDC to migrate data from OceanBase Database to a MySQL database.
Prepare the installation environment
Configure OceanBase Binlog Service
See Create a binlog cluster to install and deploy OceanBase Binlog Service.
Configure the Flink environment
Download Flink and the required dependency packages:
Download Flink from the download page. This topic uses Flink 1.15.3, decompressed to the
flink-1.15.3directory.Download the dependency packages listed below and place them in the
flink-1.15.3/lib/directory.
Prepare data
Prepare data in OceanBase Database
Prepare test data in OceanBase Database as the source data to import into the MySQL database.
Log in to OceanBase Database.
Log in to the
mysql001tenant as therootuser.[xxx@xxx /home/admin] $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p****** -A Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>Create a database named
test_ob_to_mysql. Then, create thetbl1andtbl2tables and insert data in the tables.obclient [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected obclient [(none)]> USE test_ob_to_mysql; Database changed obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
Prepare data in the MySQL database
Create a table for storing the source data in the MySQL database.
Go to the MySQL database.
[xxx@xxx /home/admin] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p****** <Omit echo information> MySQL [(none)]>Create a database named
test_ob_to_mysqland a table namedob_tbl1_and_tbl2.MySQL [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected MySQL [(none)]> USE test_ob_to_mysql; Database changed MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
Start a Flink cluster and Flink SQL CLI
Run the following command to go to the Flink directory.
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3Run the following command to start the Flink cluster.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.shAfter the Flink cluster is started, you can access the web UI of Flink at
http://localhost:8081/, as shown in the following figure.
Note
After you run the
./bin/start-cluster.shcommand, ifbash: ./bin/start-cluster.sh: Permission deniedis displayed, you must change the privileges on all files in theflink-1.15.3directory from-rw-rw-r--to-rwxrwxrwx.Here is an example:
[xxx@xxx /.../flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*Run the following command to start Flink SQL CLI.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.shAfter the CLI is started, the page shown in the following figure appears.

Set checkpoints
In the Flink SQL CLI, enable checkpointing and set the checkpointing interval to 3s.
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Create OceanBase CDC tables
In the Flink SQL CLI, create tables that correspond to OceanBase Database. For the tbl1 and tbl2 tables in the test_ob_to_mysql database, use the Flink SQL CLI to create matching tables and synchronize data from those underlying database tables.
Flink SQL> CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl1',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl2',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
For more information about the WITH options of the OceanBase CDC Connector, see OceanBase CDC Connector.
Create MySQL CDC tables
In the Flink SQL CLI, create tables that correspond to the MySQL database. Create the ob_tbl1_and_tbl2 table to write the synchronized data to the MySQL database.
Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
'username' = 'root',
'password' = '******',
'table-name' = 'ob_tbl1_and_tbl2');
[INFO] Execute statement succeed.
For more information about the WITH options of JDBC SQL Connector, see JDBC SQL Connector.
Write data to the MySQL database in the Flink SQL CLI
Use Flink SQL to join the tbl1 and tbl2 tables and write the associated information to the MySQL database.
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM ob_tbl1 t1,ob_tbl2 t2
WHERE t1.col3=t2.col1;
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
> SELECT t1.col1,t1.col3,t1.col2,t2.col2
> FROM ob_tbl1 t1,ob_tbl2 t2
> WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
Note
The MySQL driver (com.mysql.jdbc.Driver) used in this example is MySQL Connector/J 5.1.47. To use a newer MySQL driver (com.mysql.cj.jdbc.Driver), use MySQL Connector/J 8.x.
Check the associated data written to the MySQL database
Log in to the MySQL database and check the data of the ob_tbl1_and_tbl2 table in the test_ob_to_mysql database.
MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set
