OceanBase logo

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

Product Overview
DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Resources

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS

OceanBase Cloud

OceanBase Database

Tools

Connectors and Middleware

QUICK START

OceanBase Cloud

OceanBase Database

BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Company

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

International - English
中国站 - 简体中文
日本 - 日本語
Sign In
Start on Cloud

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

Product Overview
DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS
OceanBase CloudOceanBase Database
ToolsConnectors and Middleware
QUICK START
OceanBase CloudOceanBase Database
BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

Start on Cloud
编组
All Products
    • Databases
    • iconOceanBase Database
    • iconOceanBase Cloud
    • iconOceanBase Tugraph
    • iconInteractive Tutorials
    • iconOceanBase Best Practices
    • Tools
    • iconOceanBase Cloud Platform
    • iconOceanBase Migration Service
    • iconOceanBase Developer Center
    • iconOceanBase Migration Assessment
    • iconOceanBase Admin Tool
    • iconOceanBase Loader and Dumper
    • iconOceanBase Deployer
    • iconKubernetes operator for OceanBase
    • iconOceanBase Diagnostic Tool
    • iconOceanBase Binlog Service
    • Connectors and Middleware
    • iconOceanBase Database Proxy
    • iconEmbedded SQL in C for OceanBase
    • iconOceanBase Call Interface
    • iconOceanBase Connector/C
    • iconOceanBase Connector/J
    • iconOceanBase Connector/ODBC
    • iconOceanBase Connector/NET
icon

OceanBase Database

SQL - V4.4.2

    Download PDF

    OceanBase logo

    The Unified Distributed Database for the AI Era.

    Follow Us
    Products
    OceanBase CloudOceanBase EnterpriseOceanBase Community EditionOceanBase seekdb
    Resources
    DocsBlogLive DemosTraining & CertificationTicket
    Company
    About OceanBaseTrust CenterLegalPartnerContact Us
    Follow Us

    © OceanBase 2026. All rights reserved

    Cloud Service AgreementPrivacy PolicySecurity
    Contact Us
    Document Feedback
    1. Documentation Center
    2. OceanBase Database
    3. SQL
    4. V4.4.2
    iconOceanBase Database
    SQL - V 4.4.2
    SQL
    KV
    • V 4.6.0
    • V 4.4.2
    • V 4.3.5
    • V 4.3.3
    • V 4.3.1
    • V 4.3.0
    • V 4.2.5
    • V 4.2.2
    • V 4.2.1
    • V 4.2.0
    • V 4.1.0
    • V 4.0.0
    • V 3.1.4 and earlier

    Use Flink CDC to migrate data from OceanBase Database to a MySQL database

    Last Updated:2026-04-02 06:23:56  Updated
    share
    What is on this page
    Prepare the environment
    Configure OceanBase Binlog Service
    Flink environment setup
    Prepare data
    Prepare data in the OceanBase database
    Prepare data in the MySQL database
    Start the Flink cluster and Flink SQL CLI
    Enable checkpointing
    Create OceanBase CDC tables
    Create MySQL CDC tables
    Write the data to the MySQL database on the Flink SQL CLI
    Check the associated data written to the MySQL database

    folded

    share

    CDC Connectors for Apache Flink (Flink CDC for short) is a set of source connectors for Apache Flink. Flink CDC can read historical data and incremental changes from most databases in real time. Flink CDC can synchronize full and incremental data of different databases to message queues and data warehouses. You can also use Flink CDC for real-time data integration, to import database data to a data lake or data warehouse in real time. Flink CDC also supports data processing. You can use the SQL client of Flink CDC to associate, widen, and aggregate database data in real time, and write the results to various stores. Change Data Capture (CDC) helps you monitor and capture changes in databases. You can do many things with data provided by CDC. For example, you can use the data to make historical databases or perform near real-time caching. You can also provide the CDC data to message queues (MQs), so you can use MQs for analysis and auditing.

    This topic describes how to use Flink CDC to migrate data from OceanBase Database to a MySQL database.

    Prepare the environment

    Configure OceanBase Binlog Service

    Please refer to Create a binlog cluster to complete the installation and deployment of OceanBase Binlog Service.

    Flink environment setup

    Download Flink and the required dependency packages.

    1. Download Flink 1.15.3 and decompress it to the flink-1.15.3 directory.

    2. Download the following dependency packages and place them in the flink-1.15.3/lib/ directory.

      • flink-sql-connector-oceanbase-cdc-2.2.0.jar

      • flink-connector-jdbc-1.15.3.jar

      • mysql-connector-java-5.1.47.jar

    Prepare data

    Prepare data in the OceanBase database

    Prepare test data in the OceanBase database as the source data to be imported to the MySQL database.

    1. Log in to the OceanBase database.

      Log in to the mysql001 tenant as a root user.

      [xxx@xxx /home/admin]
      $obclient -h10.10.10.2 -P2881 -uroot@mysql001 -p****** -A
      Welcome to the OceanBase.  Commands end with ; or \g.
      Your OceanBase connection id is 3221536981
      Server version: OceanBase 4.0.0.0 (r100000302022111120-7cef93737c5cd03331b5f29130c6e80ac950d33b) (Built Nov 11 2022 20:38:33)
      
      Copyright (c) 2000, 2018, OceanBase and/or its affiliates. All rights reserved.
      
      Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
      
      obclient [(none)]>
      
    2. Create a database named test_ob_to_mysql. Then, create the tbl1 and tbl2 tables and insert data in the tables.

      obclient [(none)]> CREATE DATABASE test_ob_to_mysql;
      Query OK, 1 row affected
      
      obclient [(none)]> USE test_ob_to_mysql;
      Database changed
      obclient [test_ob_to_mysql]> CREATE TABLE tbl1(col1 INT PRIMARY KEY, col2 VARCHAR(20),col3 INT);
      Query OK, 0 rows affected
      
      obclient [test_ob_to_mysql]> INSERT INTO tbl1 VALUES(1,'China',86),(2,'Taiwan',886),(3,'Hong Kong',852),(4,'Macao',853),(5,'North Korea',850);
      Query OK, 5 rows affected
      Records: 5  Duplicates: 0  Warnings: 0
      
      obclient [test_ob_to_mysql]> CREATE TABLE tbl2(col1 INT PRIMARY KEY,col2 VARCHAR(20));
      Query OK, 0 rows affected
      
      obclient [test_ob_to_mysql]> INSERT INTO tbl2 VALUES(86,'+86'),(886,'+886'),(852,'+852'),(853,'+853'),(850,'+850');
      Query OK, 5 rows affected
      Records: 5  Duplicates: 0  Warnings: 0
      

    Prepare data in the MySQL database

    Create a table for storing the source data in the MySQL database.

    1. Go to the MySQL database.

      [xxx@xxx /home/admin]
      $mysql -hxxx.xxx.xxx.xxx -P3306 -uroot -p******
      <Omit echo information>
      
      MySQL [(none)]>
      
    2. Create a database named test_ob_to_mysql and a table named ob_tbl1_and_tbl2.

      MySQL [(none)]> CREATE DATABASE test_ob_to_mysql;
      Query OK, 1 row affected
      
      MySQL [(none)]> USE test_ob_to_mysql;
      Database changed
      MySQL [test_ob_to_mysql]> CREATE TABLE ob_tbl1_and_tbl2(col1 INT PRIMARY KEY,col2 INT,col3 VARCHAR(20),col4 VARCHAR(20));
      Query OK, 0 rows affected
      

    Start the Flink cluster and Flink SQL CLI

    1. Run the following command to jump to the home directory of Flink.

      [xxx@xxx /FLINK_HOME]
      #cd flink-1.15.3
      
    2. Run the following command to start the Flink cluster.

      [xxx@xxx /FLINK_HOME/flink-1.15.3]
      #./bin/start-cluster.sh
      

      After the Flink cluster is started, you can access the web UI of Flink at http://localhost:8081/, as shown in the following figure.

      Flink_Web_UI

      Note

      After you run the ./bin/start-cluster.sh command, if bash: ./bin/start-cluster.sh: Permission denied is displayed, you must change the privileges on all files in the flink-1.15.3 directory from -rw-rw-r-- to -rwxrwxrwx.

      Here is an example:

      
       [xxx@xxx /.../flink-1.15.3]
       # chmod -R 777 /FLINK_HOME/flink-1.15.3/*
       
    3. Run the following command to start Flink SQL CLI.

      [xxx@xxx /FLINK_HOME/flink-1.15.3]
      #./bin/sql-client.sh
      

      After the CLI is started, the page shown in the following figure appears.

      Flink_SQL_CLI

    Enable checkpointing

    On the Flink SQL CLI, enable checkpointing and set the checkpointing interval to 3s.

    Flink SQL> SET execution.checkpointing.interval = 3s;
    [INFO] Session property has been set.
    

    Create OceanBase CDC tables

    On the Flink SQL CLI, create tables corresponding to OceanBase Database. On the Flink SQL CLI, create the tbl1 and tbl2 tables in the test_ob_to_mysql OceanBase database to synchronize the data in the tables.

    Flink SQL> CREATE TABLE ob_tbl1 (
        col1 INT PRIMARY KEY,
        col2 VARCHAR(20),
        col3 INT)
        WITH ('connector' = 'oceanbase-cdc',
        'scan.startup.mode' = 'initial',
        'tenant-name' = 'mysql001',
        'username' = 'root@mysql001',
        'password' = '******',
        'database-name' = 'test_ob_to_mysql',
        'table-name' = 'tbl1',
        'hostname' = '10.10.10.2',
        'port' = '2881',
        'rootserver-list' = '10.10.10.2:2882:2881',
        'logproxy.host' = '10.10.10.2',
        'logproxy.port' = '2983');
    [INFO] Execute statement succeed.
    
    Flink SQL> CREATE TABLE ob_tbl2 (col1 INT PRIMARY KEY,
        col2 VARCHAR(20))
        WITH ('connector' = 'oceanbase-cdc',
        'scan.startup.mode' = 'initial',
        'tenant-name' = 'mysql001',
        'username' = 'root@mysql001',
        'password' = '******',
        'database-name' = 'test_ob_to_mysql',
        'table-name' = 'tbl2',
        'hostname' = '10.10.10.2',
        'port' = '2881',
        'rootserver-list' = '10.10.10.2:2882:2881',
        'logproxy.host' = '10.10.10.2',
        'logproxy.port' = '2983');
    [INFO] Execute statement succeed.
    

    For more information about OceanBase CDC WITH, see OceanBase CDC Connector.

    Create MySQL CDC tables

    On the Flink SQL CLI, create tables corresponding to the MySQL database. Create the ob_tbl1_and_tbl2 table to write the synchronized data to the MySQL database.

    Flink SQL> CREATE TABLE ob_tbl1_and_tbl2(
        col1 INT PRIMARY KEY,
        col2 INT,col3 VARCHAR(20),
        col4 VARCHAR(20))
        WITH ('connector' = 'jdbc',
        'url' = 'jdbc:mysql://xxx.xxx.xxx.xxx:3306/test_ob_to_mysql',
        'username' = 'root',
        'password' = '******',
        'table-name' = 'ob_tbl1_and_tbl2');
    [INFO] Execute statement succeed.
    

    For more information about JDBC SQL Connector WITH, see JDBC SQL Connector.

    Write the data to the MySQL database on the Flink SQL CLI

    Use Flink SQL to associate tables tbl1 and tbl2 and write the associated data to the MySQL database.

    Flink SQL> INSERT INTO ob_tbl1_and_tbl2 
        SELECT t1.col1,t1.col3,t1.col2,t2.col2 
        FROM ob_tbl1 t1,ob_tbl2 t2 
        WHERE t1.col3=t2.col1;
    Flink SQL> INSERT INTO ob_tbl1_and_tbl2
    >     SELECT t1.col1,t1.col3,t1.col2,t2.col2
    >     FROM ob_tbl1 t1,ob_tbl2 t2
    >     WHERE t1.col3=t2.col1;
    [INFO] Submitting SQL update statement to the cluster...
    Loading class `com.mysql.jdbc.Driver`. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver`. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: 9cd180a65cb4e2c4d1a5a91465aa38a3
    

    Note

    In this example, the version for the MySQL driver (com.mysql.jdbc.Driver) used is MySQL Connector/J 5.1.47. For a new MySQL driver (com.mysql.cj.jdbc.Driver), use MySQL Connector/J 8.x.

    Check the associated data written to the MySQL database

    Log in to the MySQL database and check the data of the ob_tbl1_and_tbl2 table in the test_ob_to_mysql database.

    MySQL [test_ob_to_mysql]> SELECT * FROM ob_tbl1_and_tbl2;
    +------+------+-------------+------+
    | col1 | col2 | col3        | col4 |
    +------+------+-------------+------+
    |    1 |   86 | China       | +86  |
    |    2 |  886 | Taiwan      | +886 |
    |    3 |  852 | Hong Kong   | +852 |
    |    4 |  853 | Macao       | +853 |
    |    5 |  850 | North Korea | +850 |
    +------+------+-------------+------+
    5 rows in set
    

    Previous topic

    Use CloudCanal to migrate data from OceanBase Database to a MySQL database
    Last

    Next topic

    Use ChunJun to migrate data from OceanBase Database to a MySQL database
    Next
    What is on this page
    Prepare the environment
    Configure OceanBase Binlog Service
    Flink environment setup
    Prepare data
    Prepare data in the OceanBase database
    Prepare data in the MySQL database
    Start the Flink cluster and Flink SQL CLI
    Enable checkpointing
    Create OceanBase CDC tables
    Create MySQL CDC tables
    Write the data to the MySQL database on the Flink SQL CLI
    Check the associated data written to the MySQL database