Flink CDC (CDC Connectors for Apache Flink) is a set of source connectors for Apache Flink. It allows real-time reading of historical data and incremental changes from most databases. Flink CDC can synchronize full and incremental data from databases to message queues and data warehouses. Additionally, Flink CDC can be used for real-time data integration. You can use it to import data from databases to data lakes or data warehouses in real time. Flink CDC also supports data processing. You can use its SQL client to perform real-time join, widening, aggregation, and write the results to various storage systems. Change Data Capture (CDC) helps you monitor and capture changes in databases. The data captured by CDC can be used for many purposes, such as building historical databases, serving as near-real-time caches, and providing data for message queues (MQ). Users can consume data from MQ for analytics and auditing.
Environment preparation
Configure OceanBase Binlog Service
Follow Create a Binlog cluster to install and deploy OceanBase Binlog Service.
Set up the Flink environment
Download Flink and the required dependency packages:
Download Flink from the download page. The version used in this topic is Flink 1.15.3, which is decompressed and stored in the
flink-1.15.3directory.Download the dependency packages listed below and put them in the
flink-1.15.3/lib/directory.
Prepare the data
Prepare data in OceanBase Database
Prepare test data in OceanBase Database as the source data for importing to MySQL Database.
Log in to OceanBase Database.
Log in to the
mysql001tenant of the cluster as therootuser.[xxx@xxx /home/admin] $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p****** -A Welcome to the OceanBase. Commands end with ; or \g. Your OceanBase connection id is 3221536981 Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33) Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved. Type 'help;' or '\h' for help. Type '\c' to clear the current input statement. obclient [(none)]>Create a database named
test_ob_to_mysql, tables namedtbl1andtbl2, and insert data.obclient [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected obclient [(none)]> USE test_ob_to_mysql; Database changed obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0 obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20)); Query OK, 0 rows affected obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850'); Query OK, 5 rows affected Records: 5 Duplicates: 0 Warnings: 0
Prepare data in the MySQL database
Create a table in the MySQL database to store the source data.
Log in to the MySQL database.
[xxx@xxx /home/admin] $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p****** <Omit echo information> MySQL [(none)]>Create a database named
test_ob_to_mysqland a table namedob_tbl1_and_tbl2.MySQL [(none)]> CREATE DATABASE test_ob_to_mysql; Query OK, 1 row affected MySQL [(none)]> USE test_ob_to_mysql; Database changed MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20)); Query OK, 0 rows affected
## Start a Flink cluster and the Flink SQL CLI
Run the following command to navigate to the Flink directory.
[xxx@xxx /FLINK_HOME] #cd flink-1.15.3Run the following command to start the Flink cluster.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/start-cluster.shIf the cluster is started successfully, you can access the Flink Web UI at
http://localhost:8081/, as shown in the following figure:
Note
If the
./bin/start-cluster.shcommand is executed and the following error message is returned:bash: ./bin/start-cluster.sh: Permission denied, change the permissions of all files with the-rw-rw-r--permission in theflink-1.15.3directory to-rwxrwxrwx.Here is an example:
[xxx@xxx /.../flink-1.15.3] # chmod -R 777 /FLINK_HOME/flink-1.15.3/*Run the following command to start the Flink SQL CLI.
[xxx@xxx /FLINK_HOME/flink-1.15.3] #./bin/sql-client.shAfter the Flink SQL CLI is started, you will see the page as shown in the following figure:

Set a checkpoint
Enable the checkpoint feature in the Flink SQL CLI and set the checkpoint interval to 3 seconds.
Flink SQL> SET execution.checkpointing.interval = 3s;
[INFO] Session property has been set.
Create an OceanBase CDC table
Create tables in the Flink SQL CLI for OceanBase Database. Use the Flink SQL CLI to create tables corresponding to the tbl1 and tbl2 tables in the test_ob_to_mysql schema of OceanBase Database for data synchronization.
Flink SQL> CREATE TABLE ob_tbl1 (
col1 INT PRIMARY KEY,
col2 VARCHAR(20),
col3 INT)
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl1',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
col2 VARCHAR(20))
WITH ('connector' = 'oceanbase-cdc',
'scan.startup.mode' = 'initial',
'tenant-name' = 'mysql001',
'username' = 'root@mysql001',
'password' = '******',
'database-name' = 'test_ob_to_mysql',
'table-name' = 'tbl2',
'hostname' = '10.10.10.2',
'port' = '2881',
'rootserver-list' = '10.10.10.2:2882:2881',
'logproxy.host' = '10.10.10.2',
'logproxy.port' = '2983');
[INFO] Execute statement succeed.
Create a CDC table in MySQL
Create a table in the Flink SQL CLI. This table, ob_tbl1_and_tbl2, is used to write the synchronized data to the MySQL database.
Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
col1 INT PRIMARY KEY,
col2 INT,col3 VARCHAR(20),
col4 VARCHAR(20))
WITH ('connector' = 'jdbc',
'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
'username' = 'root',
'password' = '******',
'table-name' = 'ob_tbl1_and_tbl2');
[INFO] Execute statement succeed.
For more information about the WITH options of the JDBC SQL Connector, see JDBC SQL Connector.
Write data to the MySQL database from the Flink SQL CLI
Join tables tbl1 and tbl2 and write the joined information to the MySQL database.
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
SELECT t1.col1,t1.col3,t1.col2,t2.col2
FROM ob_tbl1 t1,ob_tbl2 t2
WHERE t1.col3=t2.col1;
Flink SQL> INSERT INTO ob_tbl1_and_tbl2
> SELECT t1.col1,t1.col3,t1.col2,t2.col2
> FROM ob_tbl1 t1,ob_tbl2 t2
> WHERE t1.col3=t2.col1;
[INFO] Submitting SQL update statement to the cluster...
Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
View data written to the MySQL database
Log in to the MySQL database and view the data of the ob_tbl1_and_tbl2 table in the test_ob_to_mysql database.
MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
+------+------+-------------+------+
| col1 | col2 | col3 | col4 |
+------+------+-------------+------+
| 1 | 86 | China | +86 |
| 2 | 886 | Taiwan | +886 |
| 3 | 852 | Hong Kong | +852 |
| 4 | 853 | Macao | +853 |
| 5 | 850 | North Korea | +850 |
+------+------+-------------+------+
5 rows in set