Apache Flink is an open-source stream processing framework dedicated for distributed real-time data processing with high throughput and high performance. It is widely applied in scenarios such as event-driven applications, real-time analytics, data pipelines, and complex event processing. This topic describes how to use the Flink SQL client and Flink Change Data Capture (CDC) Connector to capture data changes in the source table in real time, associate the data changes with user information, and write the association results to another data table in OceanBase Cloud.
Prerequisites
Before data synchronization, make sure that the following conditions are met:
- You have created dedicated data synchronization users for the source MySQL-compatible tenant of OceanBase Cloud and the target MySQL-compatible tenant, and granted required privileges to the users.
- You have created the required table schema in the target database of OceanBase Cloud, and the table schema is the same as that of the source database.
- You have installed Flink. For more information, visit the download page of Flink.
- You have downloaded the flink-sql-connector-mysql-cdc dependency file in the JAR format. For more information, see Flink SQL Connector MySQL CDC.
- You have downloaded the flink-connector-jdbc dependency file in the JAR format. For more information, see Flink SQL Connector MySQL CDC.
If you use a Flink cluster, you must place the JAR files in the /lib directory of the Flink cluster. Generally, the path is $FLINK_HOME/lib, where FLINK_HOME is the installation directory of your Flink cluster. If you use Flink in standalone mode, make sure that the JAR files are located in the classpath directory. You can use a command-line option, such as -classpath or -cp, to specify the path for storing JAR files during running.
Procedure
Activate the Binlog service. In the left-side navigation pane of the OceanBase Cloud console, choose Instances > Tenant Management and activate the Binlog service. For more information, see Activate the Binlog service.
Prepare data. The sample data comprises the
ordersanduserstables. Theorderstable contains order data. Theuserstable contains user information. Create tables in OceanBase Cloud and insert data into the tables.-- Create a table named `orders`. CREATE TABLE `orders` ( `order_id` INT PRIMARY KEY, `price` DECIMAL(10,2), `currency` VARCHAR(3), `user_id` INT ); -- Create a table named `users`. CREATE TABLE `users` ( `user_id` INT PRIMARY KEY, `user_name` VARCHAR(255), `email` VARCHAR(255) ); -- Create a table named `order_details`. CREATE TABLE `order_details` ( `order_id` INT PRIMARY KEY, `order_price` DECIMAL(10, 2), `currency` VARCHAR(255), `user_id` INT, `user_name` VARCHAR(255), `email` VARCHAR(255) ); -- Insert sample data into the `orders` table. INSERT INTO `orders` VALUES (1, 100.00, 'USD', 1); INSERT INTO `orders` VALUES (2, 55.50, 'EUR', 2); INSERT INTO `orders` VALUES (3, 80.99, 'USD', 3); -- Insert sample data into the `users` table. INSERT INTO `users` VALUES (1, 'John Doe', 'john.doe@example.com'); INSERT INTO `users` VALUES (2, 'Jane Smith', 'jane.smith@example.com'); INSERT INTO `users` VALUES (3, 'Alice Johnson', 'alice.johnson@example.com');In the Flink SQL client, define the source table, dimension table, and target table.
-- Set the `orders` table as the source table . CREATE TABLE ob_orders ( order_id INT, price DECIMAL(10, 2), currency STRING, user_id INT, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'mysql-cdc', 'hostname' = 't5********.aws-ap-southeast-1.oceanbase.cloud', 'port' = '3306', 'username' = 'test', 'password' = 'xxxx', 'database-name' = 'test2', 'table-name' = 'orders' ); -- Set the `users` table as the dimension table. CREATE TABLE ob_users ( user_id INT, user_name STRING, email STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://t5********.aws-ap-southeast-1.oceanbase.cloud:3306/test2', 'table-name' = 'users', 'username' = 'test', 'password' = 'xxxx' ); -- Set the `order_details` table as the target table. CREATE TABLE ob_order_details ( order_id INT, order_price DECIMAL(10, 2), currency STRING, user_id INT, user_name STRING, email STRING, PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://t5********.aws-ap-southeast-1.oceanbase.cloud:3306/test2', 'table-name' = 'order_details', 'username' = 'test', 'password' = 'xxxx' );Perform a join query and write the results into the target table.
INSERT INTO ob_order_details SELECT o.order_id, o.price, o.currency, u.user_id, u.user_name, u.email FROM ob_orders AS o JOIN ob_users AS u ON o.user_id = u.user_id;
In the preceding query, an INNER JOIN operation is performed on the ob_orders and ob_users tables to join orders with user data by user_id. The result set contains order information and user information. The results are written to the target table ob_order_details. You can execute this SQL statement in the Flink SQL client or an environment that supports Flink SQL, to build an order details table that is updated in real time. When data in the orders table changes, a MySQL CDC-based Flink task automatically synchronizes the changes to the ob_order_details table.
For more information about parameters of Flink CDC Connector, see JDBC SQL Connector.