Flink CDC (CDC Connectors for Apache Flink) is a set of source connectors for Apache Flink. It can read historical data and incremental changes from most databases in real time. Flink CDC can synchronize full and incremental data from databases to message queues and data warehouses. Additionally, Flink CDC can be used for real-time data integration. You can use it to import data from databases to data lakes or data warehouses in real time. Flink CDC also supports data processing. You can use its SQL client to perform real-time joins, widening, aggregation, and result writing to various storage. Change Data Capture (CDC) helps you monitor and capture changes in databases. The data captured by CDC can be used for many purposes, such as building historical databases, serving as near-real-time caches, and providing data for message queues (MQ). Users can consume data from MQ for analytics and auditing.
This topic will guide you through the process of using Flink CDC to migrate data from OceanBase Database to a MySQL database.
Environment preparation
Configure OceanBase Binlog Service
Follow Create a Binlog cluster to install and deploy OceanBase Binlog Service.
Set up the Flink environment
Download Flink and the required dependency packages:
Download Flink from Download address. The version used in this topic is Flink 1.15.3, which is decompressed and stored in the
flink-1.15.3directory.Download the dependency packages listed below and put them in the
flink-1.15.3/lib/directory.
Prepare data
Prepare data in OceanBase Database
Prepare test data in OceanBase Database as the source data for importing to MySQL Database.
Log in to OceanBase Database.
Log in to the
mysql001tenant of the cluster as therootuser.[xxx@xxx /home/admin] $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p****** -A Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>Create a database named
test_ob_to_mysql, tables namedtbl1andtbl2, and insert data into the tables.obclient [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected obclient [(none)]> USE test_ob_to_mysql; Database changed obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
Prepare data in MySQL Database
Create a table in MySQL Database to store the source data.
Enter MySQL Database.
[xxx@xxx /home/admin] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p****** <Omit echo information> MySQL [(none)]>Create a database named
test_ob_to_mysqland a table namedob_tbl1_and_tbl2.MySQL [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected MySQL [(none)]> USE test_ob_to_mysql; Database changed MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
Start a Flink cluster and the Flink SQL CLI
Run the following command to navigate to the Flink directory.
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3Run the following command to start the Flink cluster.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.shIf the cluster starts successfully, you can access the Flink Web UI at
http://localhost:8081/, as shown in the following figure:
Note
If you encounter an error like this:
bash: ./bin/start-cluster.sh: Permission deniedafter you run the./bin/start-cluster.shcommand, change the permissions of all files with the-rw-rw-r--permission in theflink-1.15.3directory to-rwxrwxrwx.Here is an example:
[xxx@xxx /.../flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*Run the following command to start the Flink SQL CLI.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.shAfter the Flink SQL CLI starts successfully, you will see the following page:

Enable checkpoint
Enable checkpoint in the Flink SQL CLI and set the checkpoint interval to 3 seconds.
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Create OceanBase CDC tables
Create OceanBase Database tables in the Flink SQL CLI. Create tables named ob_tbl1 and ob_tbl2 based on the tbl1 and tbl2 tables in the test_ob_to_mysql database in OceanBase Database, respectively, to synchronize the data in these database tables.
Flink SQL> CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl1',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl2',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
For more information about the WITH options for OceanBase CDC, see OceanBase CDC Connector.
Create a CDC table in MySQL
Create a table in the Flink SQL CLI. This table, named ob_tbl1_and_tbl2, is used to write the synchronized data to the MySQL database.
Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
'username' = 'root',
'password' = '******',
'table-name' = 'ob_tbl1_and_tbl2');
[INFO] Execute statement succeed.
For more information about the WITH options of the JDBC SQL Connector, see JDBC SQL Connector.
Write data to the MySQL database from the Flink SQL CLI
Join table tbl1 and table tbl2 and write the joined information to the MySQL database.
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM ob_tbl1 t1,ob_tbl2 t2
WHERE t1.col3=t2.col1;
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
> SELECT t1.col1,t1.col3,t1.col2,t2.col2
> FROM ob_tbl1 t1,ob_tbl2 t2
> WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
Note
The MySQL driver (com.mysql.jdbc.Driver) used in the test case of this topic is from MySQL Connector/J 5.1.47. For new projects, it is recommended to use the driver (com.mysql.cj.jdbc.Driver) from MySQL Connector/J 8.x.
Check the data written to the MySQL database
Log in to the MySQL database and query the ob_tbl1_and_tbl2 table in the test_ob_to_mysql database for data.
MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set