Apache Flink is an open-source stream processing framework designed for distributed, high-throughput, and high-performance real-time data processing scenarios. OceanBase provides a Flink connector based on JDBC, which allows you to efficiently write data to OceanBase in Flink using the standard JDBC protocol. This connector supports MySQL compatible mode and Oracle compatible mode, making it suitable for real-time data synchronization and streaming computation result writing.
Features
Flink Connector OceanBase (JDBC connector) writes data to OceanBase through the JDBC driver and has the following core features:
- Standard JDBC protocol: It uses the mature JDBC standard protocol to communicate with OceanBase, ensuring good compatibility and ease of integration.
- Dual mode support: It supports both MySQL and Oracle compatible modes of OceanBase.
- Flexible driver selection:
- In MySQL compatible mode, you can use the MySQL JDBC driver or the OceanBase JDBC driver.
- In Oracle compatible mode, you must use the OceanBase JDBC driver.
- Connection pool management: It is based on the Druid connection pool and supports connection reuse and parameter optimization.
- Batch write optimization: It supports buffering and batch commit to improve write throughput.
Scenarios: Real-time data synchronization (CDC, streaming computation results).
Software requirements and dependencies
| Software | Version requirement | Description |
|---|---|---|
| Apache Flink | 1.15 or later | We recommend that you use 1.15+ for better performance and stability. |
| JDK | 8 or later | — |
| OceanBase | V3.x, V4.x, or later | OceanBase Database supports the MySQL and Oracle modes. Make sure that you have installed OceanBase Database and created a MySQL/Oracle-compatible mode tenant. |
JDBC driver
Based on the OceanBase mode you use, prepare the corresponding JDBC driver:
MySQL compatible mode
You can choose to use one of the following drivers:
MySQL JDBC driver (We recommend that you use MySQL Connector/J 8.0 or later.)
- Download address: https://dev.mysql.com/downloads/connector/j/
OceanBase JDBC driver
- Dependency name:
com.oceanbase:oceanbase-client:2.4.9 - Download address: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
Note
You must place the downloaded driver JAR file in the
libdirectory of Flink. For more information, see the following sections.- Dependency name:
Oracle compatible mode (OceanBase Database Enterprise Edition)
You must use the OceanBase JDBC driver:
- Dependency name:
com.oceanbase:oceanbase-client:2.4.9 - Download address: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
Procedure
Step 1: Deploy a local Flink cluster (standalone)
Download Apache Flink
Go to the Flink official download page and select Stable Release (we recommend 1.15 or later, such as 1.18 or 1.19).
For example (Linux/macOS terminal commands):
# Download Flink (example: 1.19.3)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# Extract the file
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# Go to the directory
cd flink-1.19.3
# Set the environment variable (optional but recommended)
export FLINK_HOME=$(pwd)
Start the local Flink cluster
In the Flink directory, run the following command:
# Start the cluster (including JobManager and TaskManager)
$FLINK_HOME/bin/start-cluster.sh
After the cluster is started, you will see an output similar to the following:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
If the cluster fails to start, you can check the logs:
# View JobManager logs
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# View TaskManager logs
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Verify that Flink is running
Open your browser and go to http://localhost:8081. You will see the Flink Dashboard, which displays 1 TaskManager and at least 1 available slot.
Step 2: Download and deploy the JDBC connector
Download the JDBC-based connector JAR file
flink-sql-connector-oceanbase-${version}.jar:- Download URL: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase/
- Select a version (for example, 1.6.0) and download the corresponding JAR file
Notice
The file name must be
flink-sql-connector-oceanbase-<version>.jar. This is the key for Flink SQL to recognize the connector.Copy the JAR file to the
libdirectory of Flink:# Assume the downloaded file is in the current directory, and FLINK_HOME is the Flink installation directory cp flink-sql-connector-oceanbase-1.6.0.jar $FLINK_HOME/lib/
Step 3: Prepare and deploy the JDBC driver
Based on the OceanBase mode you are using, download and place the corresponding JDBC driver:
Method 1: MySQL compatible mode
You can use either of the following drivers:
MySQL JDBC Driver (we recommend 8.0 or later)
- Download URL: https://dev.mysql.com/downloads/connector/j/
- Select the Platform Independent version (ZIP or TAR.GZ)
- After decompressing, locate the
mysql-connector-j-x.x.x.jarfile
OceanBase JDBC Driver
- Download URL: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- Select the latest version and download the JAR file
Copy the downloaded driver JAR file to the
libdirectory of Flink:# Example: Copy the MySQL JDBC driver cp mysql-connector-j-8.0.33.jar $FLINK_HOME/lib/
Method 2: Oracle compatible mode
You must use the OceanBase JDBC driver:
- Download URL: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- Select the latest version (for example, 2.4.9) and download the JAR file
# Copy the OceanBase JDBC driver
cp oceanbase-client-2.4.9.jar $FLINK_HOME/lib/
Restart the Flink cluster (to make the JAR file effective)
# Stop the cluster first
$FLINK_HOME/bin/stop-cluster.sh
# Then start the cluster
$FLINK_HOME/bin/start-cluster.sh
Step 4: Obtain the database connection information
Contact the deployment personnel or administrator of OceanBase Database to obtain the database connection information and log in. For example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connecting to OceanBase Database. If you connect through OceanBase Database Proxy (ODP), use the IP address of the ODP. If you connect directly, use the IP address of an OBServer node.$port: the port for connecting to OceanBase Database. If you connect through ODP, the default port is2883, which can be customized when ODP is deployed. If you connect directly, the default port is2881, which can be customized when OceanBase Database is deployed.$database_name: the name of the database to be accessed.Notice
The user for connecting to a tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege types in MySQL-compatible mode.$user_name: the tenant account. If you connect through ODP, the account is in theusername@tenant name#cluster nameorcluster name:tenant name:usernameformat. If you connect directly, the account is in theusername@tenant nameformat.$password: the password of the account.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Verify network connectivity
Ensure that the environment where the Flink job is running can access OceanBase Database:
Test command:
# Test the connectivity to the RPC port
nc -zv <oceanbase-host> <rpc-port>
-
: the IP address for connecting to OceanBase Database. If you connect through ODP, use the IP address of the ODP. If you connect directly, use the IP address of an OBServer node. -
: the port for connecting to OceanBase Database. If you connect through ODP, the default port is 2883, which can be customized when ODP is deployed. If you connect directly, the default port is2881, which can be customized when OceanBase Database is deployed.
Step 5: Prepare the target table in OceanBase Database
Create the target table t_sink in the test database in MySQL compatible mode of OceanBase Database:
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Step 6: Start the Flink SQL Client and test
Execute SQL statements and write data by using the SQL Client:
# Go to the installation directory of Flink (if you have not done so)
cd $FLINK_HOME
# Start the SQL Client
./bin/sql-client.sh
After the SQL Client is started, you will see the interactive command-line interface of Flink SQL Client:
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
Now you can execute SQL statements in the SQL Client.
Step 7: Configure the JDBC connection and write data
MySQL compatible mode
In SQL Client, create a Flink target table and map it to the t_sink table in OceanBase Database:
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test', ------Example only. Please fill in the correct URL based on your actual environment.
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster', ------Example only. Please fill in the actual tenant name obtained from the connection string.
'password' = 'your_password', ------Please fill in the actual password obtained from the connection string.
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Configuration description:
url: the JDBC connection address of OceanBase Database, in the format ofjdbc:mysql://host:port/database.schema-nameandtable-name: the name of the target database and table.usernameandpassword: the username and password of OceanBase Database.- Other parameters are used to optimize performance and reliability.
After the table is created, insert test data:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
After the execution is successful, you will see an output similar to the following:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Verify the data write
Query the OceanBase Database to verify whether the data is successfully written:
# Connect to OceanBase Database in MySQL compatible mode. The following is an example. Please fill in the correct connection information based on the connection string.
mysql -h127.0.0.1 -P2881 -uroot@test#obcluster -p
-- Query data
USE test;
SELECT * FROM t_sink;
Expected result:
+----+----------+-------+
| id | username | score |
+----+----------+-------+
| 1 | Tom | 89 |
| 2 | Jerry | 88 |
+----+----------+-------+
2 rows in set
Note
- Due to primary key conflicts, the record with
id=1will be updated to the latest value (Tom, 89). - The connector communicates with OceanBase Database through the JDBC protocol and uses standard UPSERT semantics.
Oracle compatible mode
For OceanBase Database Enterprise Edition in Oracle compatible mode, you need to specify the url and driver-class-name corresponding to the OceanBase JDBC driver.
Execute the following statement in SQL Client:
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS', ------Example only. Please fill in the correct URL based on your actual environment.
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster', ------Example only. Please fill in the actual tenant name obtained from the connection string.
'password' = 'pswd', ------Please fill in the actual password obtained from the connection string.
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Oracle-compatible mode configuration points:
url: use thejdbc:oceanbase://protocol.driver-class-name: must be set tocom.oceanbase.jdbc.Driver.schema-nameandtable-name: in Oracle-compatible mode, uppercase is typically used.
Insert test data:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
Verify the data write
# Connect to OceanBase Database in Oracle compatible mode. The following is an example. Please fill in the correct connection information based on the connection string.
obclient -h127.0.0.1 -P2881 -uSYS@test#obcluster -p
-- Query data
SELECT * FROM T_SINK;
Note
Oracle-compatible mode also supports UPSERT semantics. When a primary key conflict occurs, the record will be automatically updated to the latest value.
Step 8: Stop and clean up (optional)
After the test is completed, you can stop the Flink cluster:
# Stop the Flink cluster
cd $FLINK_HOME
./bin/stop-cluster.sh
Exit SQL Client:
-- Execute the following statement in SQL Client
QUIT;
JDBC connector configuration
The following table describes the main configurations of the JDBC connector:
| Parameter | Required | Default value | Type | Description |
|---|---|---|---|---|
url |
Yes | - | String | The JDBC URL of the database. |
username |
Yes | - | String | The username for the database connection. |
password |
Yes | - | String | The password for the database connection. |
schema-name |
Yes | - | String | The name of the schema or database to connect to. |
table-name |
Yes | - | String | The name of the table. |
driver-class-name |
No | com.mysql.cj.jdbc.Driver |
String | The name of the JDBC driver class. For MySQL compatibility mode, you can use 'com.mysql.cj.jdbc.Driver' (default value) or 'com.oceanbase.jdbc.Driver'. For Oracle compatibility mode, you must use 'com.oceanbase.jdbc.Driver'. |
druid-properties |
No | - | String | The properties of the Druid connection pool. Multiple values are separated with semicolons (;). |
sync-write |
No | false |
Boolean | Specifies whether to enable synchronous writes. If you set this parameter to true, direct writes to the database are performed without using a buffer. |
buffer-flush.interval |
No | 1s |
Duration | The interval for flushing the buffer. If you set this parameter to '0', the periodic flush is disabled. |
buffer-flush.buffer-size |
No | 1000 |
Integer | The size of the buffer. |
max-retries |
No | 3 |
Integer | The maximum number of retries for failed operations. |
memstore-check.enabled |
No | true |
Boolean | Specifies whether to enable memory checks. |
memstore-check.threshold |
No | 0.9 |
Double | The threshold of memory usage as a ratio of the maximum memory limit. |
memstore-check.interval |
No | 30s |
Duration | The interval for checking memory usage. |
partition.enabled |
No | false |
Boolean | Specifies whether to enable partition computing. If you enable this option, data is written to partitions. This option takes effect only when both 'sync-write' and 'direct-load.enabled' are set to false. |
Note
This connector is based on the JDBC protocol and uses the Druid connection pool to manage database connections.
Scenarios
Real-time data synchronization
You can use the JDBC connector to write data in real time and take advantage of asynchronous batch writes to improve throughput. The JDBC connector uses an internal buffering mechanism to batch submit multiple records to OceanBase Database, making it suitable for continuous real-time data stream writes.
Typical scenarios:
Database synchronization with CDC The JDBC connector can be used with CDC data sources, such as MySQL CDC, to synchronize data between databases in real time. As a sink, the JDBC connector writes the change data captured by CDC to OceanBase Database using the standard JDBC protocol.
Writing streaming computation results You can write the results of Flink streaming computations to OceanBase Database in real time, supporting complex data processing and transformation.
Real-time business data synchronization You can continuously synchronize data changes from business systems to OceanBase Database for real-time updates in data warehouses and real-time reporting.
Best practices for using Flink Connector
Adjust parallelism to improve performance
Adjusting the parallelism of Flink tasks can significantly enhance write performance. Higher parallelism means more simultaneous connections to OceanBase, resulting in higher throughput.
Configuration method: Set the global parallelism in the Flink SQL Client:
SET 'parallelism.default' = '8';
Optimization suggestions:
- Set the parallelism based on the data volume and OceanBase cluster performance.
- Monitor the load on OceanBase to avoid excessive write pressure.
Optimize batch write performance
Adjust the buffer-flush.buffer-size and buffer-flush.interval parameters to balance latency and throughput:
buffer-flush.buffer-size: Buffer size, default is 1000 records.buffer-flush.interval: Buffer flush cycle, default is 1 second.
In high-throughput scenarios, you can increase buffer-size and extend interval.
Ensure reliability
- Set the
max-retriesparameter to control the number of retries on failure (default is 3). - Enable
memstore-check.enabledto prevent OceanBase memory overflow (default is enabled). - Use
sync-write=trueto ensure real-time data visibility, but this will sacrifice performance (each record is directly submitted via JDBC).
Monitoring recommendations
- Monitor the backpressure in Flink tasks to identify performance bottlenecks.
- Pay attention to OceanBase's write performance metrics (TPS, QPS, response time, etc.).
- Monitor data skew across parallel tasks to ensure load balancing.