Apache Flink is an open-source stream processing framework designed for distributed, high-throughput, and high-performance real-time data processing scenarios. OceanBase provides a JDBC-based Flink connector that allows you to write data to OceanBase efficiently through the standard JDBC protocol in Flink. It supports MySQL compatible mode and Oracle compatible mode, making it suitable for real-time data synchronization and streaming computation result writing.
Connector features
Flink Connector OceanBase (JDBC Connector) writes data to OceanBase by using the JDBC driver. The connector has the following features:
- Standard JDBC protocol: The connector communicates with OceanBase by using the mature JDBC protocol. This protocol has good compatibility and is easy to integrate.
- Dual modes: The connector supports both MySQL and Oracle modes of OceanBase.
- Flexible driver selection:
- In MySQL mode, you can use the MySQL JDBC driver or the OceanBase JDBC driver.
- In Oracle mode, you must use the OceanBase JDBC driver.
- Connection pool management: The connector uses the Druid connection pool to support connection reuse and parameter tuning.
- Batch write optimization: The connector supports buffering and batch submission to improve the write throughput.
Scenarios: Real-time data synchronization (CDC, streaming computing results).
Software requirements and dependencies
Software |
Version requirements |
Description |
|---|---|---|
| Apache Flink | 1.15 or a later version | We recommend that you use Flink 1.15 or later for better performance and stability. |
| JDK | JDK 8 or a later version | — |
| OceanBase Database | V3.x, V4.x, or a later version | OceanBase Database supports MySQL and Oracle modes. Make sure that you have installed OceanBase Database and created a MySQL or Oracle mode tenant. |
JDBC drivers
Depending on the OceanBase Database mode you use, prepare the corresponding JDBC driver:
MySQL compatible mode
You can choose one of the following drivers:
MySQL JDBC driver (We recommend that you use MySQL Connector/J 8.0 or later)
- Download address: https://dev.mysql.com/downloads/connector/j/
OceanBase JDBC driver
- Dependency name:
com.oceanbase:oceanbase-client:2.4.9 - Download address: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
Note
After you download the driver JAR file, you must put it in the
libdirectory of Flink. For more information, see the following sections.- Dependency name:
Oracle compatible mode (OceanBase Database Enterprise Edition)
You must use the OceanBase JDBC driver:
- Dependency name:
com.oceanbase:oceanbase-client:2.4.9 - Download address: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
Procedure
Step 1: Quickly deploy a local Flink cluster (standalone)
Download Apache Flink
Go to the Flink official download page and select Stable Release (we recommend 1.15+ such as 1.18 or 1.19).
For example (Linux/macOS terminal command):
# Download Flink (example: 1.19.3)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# Extract the file
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# Go to the directory
cd flink-1.19.3
# Set the environment variable (optional but recommended)
export FLINK_HOME=$(pwd)
Start the local Flink cluster
In the Flink directory, execute the following command:
# Start the cluster (JobManager + TaskManager)
$FLINK_HOME/bin/start-cluster.sh
After the cluster is started, you will see output similar to the following:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
If the start fails, you can check the logs:
# View the JobManager log
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# View the TaskManager log
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Verify that Flink is running
Open your browser and go to: http://localhost:8081. You will see the Flink Dashboard, which shows 1 TaskManager and at least 1 available slot.
Step 2: Download and deploy the JDBC connector
Download the JDBC-based connector JAR package
flink-sql-connector-oceanbase-${version}.jar:- Download URL: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase/
- Select a version (such as 1.6.0) and download the corresponding JAR file
Notice
The file name must be
flink-sql-connector-oceanbase-<version>.jar. This is the key for Flink SQL to identify the connector.Copy the JAR file to the
libdirectory of Flink:# Assuming the downloaded file is in the current directory, and FLINK_HOME is the Flink installation directory cp flink-sql-connector-oceanbase-1.6.0.jar $FLINK_HOME/lib/
Step 3: Prepare and deploy the JDBC driver
Based on the OceanBase mode you are using, download and place the corresponding JDBC driver:
Method 1: MySQL compatible mode
You can choose either of the following drivers:
MySQL JDBC driver (recommended 8.0 or later)
- Download URL: https://dev.mysql.com/downloads/connector/j/
- Select the Platform Independent version (ZIP or TAR.GZ)
- After decompressing, find the
mysql-connector-j-x.x.x.jarfile
OceanBase JDBC driver
- Download URL: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- Select the latest version and download the JAR file
Copy the downloaded driver JAR file to the
libdirectory of Flink:# Example: Copy the MySQL JDBC driver cp mysql-connector-j-8.0.33.jar $FLINK_HOME/lib/
Method 2: Oracle compatible mode
You must use the OceanBase JDBC driver:
- Download URL: https://mvnrepository.com/artifact/com.oceanbase/oceanbase-client
- Select the latest version (such as 2.4.9) and download the JAR file
# Copy the OceanBase JDBC driver
cp oceanbase-client-2.4.9.jar $FLINK_HOME/lib/
Restart the Flink cluster (to make the JAR effective)
# Stop the cluster first
$FLINK_HOME/bin/stop-cluster.sh
# Then start the cluster
$FLINK_HOME/bin/start-cluster.sh
Step 4: Obtain the database connection information
Contact the deployment personnel or administrator of OceanBase Database to obtain the database connection information and log in. For example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connecting to OceanBase Database. For the connection through OceanBase Database Proxy (ODP), use the IP address of the ODP. For direct connection, use the IP address of the OBServer node.$port: the port for connecting to OceanBase Database. For the connection through ODP, the default port is2883, which can be customized when ODP is deployed. For direct connection, the default port is2881, which can be customized when OceanBase Database is deployed.$database_name: the name of the database to be accessed.Notice
The user for connecting to the tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege types in MySQL mode.$user_name: the tenant connection account. For the connection through ODP, the common format isusername@tenant name#cluster nameorcluster name:tenant name:username. For direct connection, the format isusername@tenant name.$password: the password of the account.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Check the network connectivity
Make sure that the environment where the Flink job is running can access OceanBase Database:
Test command:
# Test the connectivity to the RPC port
nc -zv <oceanbase-host> <rpc-port>
-
: the IP address for connecting to OceanBase Database, which is the value of $host. For the connection through ODP, use the IP address of the ODP. For direct connection, use the IP address of the OBServer node. -
: the port for connecting to OceanBase Database, which is the value of $port. For the connection through ODP, the default port is2883, which can be customized when ODP is deployed. For direct connection, the default port is2881, which can be customized when OceanBase Database is deployed.
Step 5: Prepare the target table in OceanBase Database
Create the t_sink table in the test database in MySQL compatible mode of OceanBase Database:
USE test;
CREATE TABLE `t_sink`
(
`id` int(10) NOT NULL,
`username` varchar(20) DEFAULT NULL,
`score` int(10) DEFAULT NULL,
PRIMARY KEY (`id`)
);
Step 6: Start the Flink SQL Client and test
Execute SQL statements and write data by using the SQL Client:
# Go to the installation directory of Flink (if not already there)
cd $FLINK_HOME
# Start the SQL Client
./bin/sql-client.sh
After the SQL Client is started, you will see the interactive command-line interface of the Flink SQL Client:
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
Now you can execute SQL statements in the SQL Client.
Step 7: Configure the JDBC connection and write data
MySQL compatible mode
Create a Flink target table in the SQL Client and map it to the t_sink table in OceanBase:
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:mysql://127.0.0.1:2881/test', ------Example only. Please fill in the correct URL based on your actual environment.
'schema-name' = 'test',
'table-name' = 't_sink',
'username' = 'root@test#obcluster', ------Example only. Please fill in the actual tenant name obtained from the connection string.
'password' = 'your_password', ------Please fill in the actual password obtained from the connection string.
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Configuration description:
url: The JDBC connection address of OceanBase, in the format ofjdbc:mysql://host:port/database.schema-nameandtable-name: The name of the target database and table.usernameandpassword: The username and password of the OceanBase database.- Other parameters are used to optimize performance and reliability.
After the table is created, insert test data:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
If the execution is successful, you will see output similar to the following:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Verify the data write
Query the OceanBase database to verify whether the data is successfully written:
# Connect to the OceanBase MySQL compatible mode. The following is an example. Please fill in the correct connection information based on the connection string.
mysql -h127.0.0.1 -P2881 -uroot@test#obcluster -p
-- Query data
USE test;
SELECT * FROM t_sink;
Expected result:
+----+----------+-------+
| id | username | score |
+----+----------+-------+
| 1 | Tom | 89 |
| 2 | Jerry | 88 |
+----+----------+-------+
2 rows in set
Note
- Due to the primary key conflict, the record with
id=1will be updated to the latest value (Tom, 89). - The connector communicates with OceanBase through the JDBC protocol and uses standard UPSERT semantics.
Oracle compatible mode
For the Oracle compatible mode of OceanBase Database Enterprise Edition, you need to specify the url and driver-class-name corresponding to the OceanBase JDBC driver.
Execute the following statement in the SQL Client:
CREATE TABLE t_sink
(
id INT,
username VARCHAR,
score INT,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase',
'url' = 'jdbc:oceanbase://127.0.0.1:2881/SYS', ------Example only. Please fill in the correct URL based on your actual environment.
'driver-class-name' = 'com.oceanbase.jdbc.Driver',
'schema-name' = 'SYS',
'table-name' = 'T_SINK',
'username' = 'SYS@test#obcluster', ------Example only. Please fill in the actual tenant name obtained from the connection string.
'password' = 'pswd', ------Please fill in the actual password obtained from the connection string.
'druid-properties' = 'druid.initialSize=10;druid.maxActive=100;',
'buffer-flush.interval' = '1s',
'buffer-flush.buffer-size' = '5000',
'max-retries' = '3'
);
Oracle mode configuration:
url: Use thejdbc:oceanbase://protocol.driver-class-name: Must be specified ascom.oceanbase.jdbc.Driver.schema-nameandtable-name: Typically use uppercase in Oracle mode.
Insert test data:
INSERT INTO t_sink
VALUES (1, 'Tom', 99),
(2, 'Jerry', 88),
(1, 'Tom', 89);
Verify the data write
# Connect to the OceanBase Oracle compatible mode. The following is an example. Please fill in the correct connection information based on the connection string.
obclient -h127.0.0.1 -P2881 -uSYS@test#obcluster -p
-- Query data
SELECT * FROM T_SINK;
Note
Oracle mode also supports UPSERT semantics. When a primary key conflict occurs, the latest value will be automatically updated.
Step 8: Stop and clean up (optional)
After the test is complete, you can stop the Flink cluster:
# Stop the Flink cluster
cd $FLINK_HOME
./bin/stop-cluster.sh
Exit the SQL Client:
-- Execute the following statement in the SQL Client
QUIT;
JDBC connector configuration
The following table describes the main configuration parameters of the JDBC connector.
Parameter |
Required |
Default value |
Type |
Description |
|---|---|---|---|---|
url |
Yes | - | String | The JDBC URL of the database. |
username |
Yes | - | String | The username for the database connection. |
password |
Yes | - | String | The password for the database connection. |
schema-name |
Yes | - | String | The name of the schema or database to connect to. |
table-name |
Yes | - | String | The name of the table. |
driver-class-name |
No | com.mysql.cj.jdbc.Driver |
String | The name of the JDBC driver class. For MySQL compatibility mode, you can use 'com.mysql.cj.jdbc.Driver' (default value) or 'com.oceanbase.jdbc.Driver'.For Oracle compatibility mode, you must use 'com.oceanbase.jdbc.Driver'. |
druid-properties |
No | - | String | The properties of the Druid connection pool. Separate multiple values with semicolons (;). |
sync-write |
No | false |
Boolean | Specifies whether to enable synchronous writes. If you set this parameter to true, data is written directly to the database without using a buffer. |
buffer-flush.interval |
No | 1s |
Duration | The interval for flushing the buffer. If you set this parameter to '0', periodic flushing is disabled. |
buffer-flush.buffer-size |
No | 1000 |
Integer | The size of the buffer. |
max-retries |
No | 3 |
Integer | The maximum number of retries. |
memstore-check.enabled |
No | true |
Boolean | Specifies whether to enable memory checks. |
memstore-check.threshold |
No | 0.9 |
Double | The ratio of the memory usage threshold to the maximum memory limit. |
memstore-check.interval |
No | 30s |
Duration | The interval for checking memory usage. |
partition.enabled |
No | false |
Boolean | Specifies whether to enable partitioning. If you enable this parameter, data is written to partitions. This parameter takes effect only when both 'sync-write' and 'direct-load.enabled' are set to false. |
Note
This connector is based on the JDBC protocol and uses the Druid connection pool to manage database connections.
Scenarios
Real-time data synchronization
Use the JDBC connector to write data in real time. The asynchronous batch write feature improves throughput. The JDBC connector uses an internal buffer to batch submit multiple records to OceanBase Database, making it suitable for continuous real-time data stream writes.
Typical scenarios:
Database synchronization with CDC The JDBC connector can be used with CDC data sources (such as MySQL CDC) to synchronize databases in real time. As the sink, the JDBC connector writes the change data captured by CDC to OceanBase Database using the standard JDBC protocol.
Writing streaming computation results Write the results of Flink streaming computations to OceanBase Database in real time, supporting complex data processing and transformation.
Real-time business data synchronization Continuously synchronize data changes from business systems to OceanBase Database for real-time updates in data warehouses and real-time reporting.
Best practices for using Flink Connector
Adjust parallelism to improve performance
Adjusting the parallelism of a Flink task can significantly improve the write performance. The higher the parallelism, the more connections are used to write data to OceanBase Database, and the higher the throughput.
Setting method: Set the global parallelism in the Flink SQL Client:
SET 'parallelism.default' = '8';
Parallelism optimization recommendations:
- Set the parallelism based on the data volume and the performance of the OceanBase cluster.
- Monitor the load of OceanBase Database to avoid excessive write pressure.
Optimize batch write performance
Adjust the buffer-flush.buffer-size and buffer-flush.interval parameters to balance latency and throughput:
buffer-flush.buffer-size: the size of the buffer, with a default value of 1,000 records.buffer-flush.interval: the buffer refresh cycle, with a default value of 1 second.
In high-throughput scenarios, you can increase the buffer-size and extend the interval.
Ensure reliability
- Set the
max-retriesparameter to control the number of retry attempts for failed tasks (default value: 3). - Enable
memstore-check.enabledto avoid memory overflow in OceanBase Database (default value: enabled). - Set
sync-write=trueto ensure real-time visibility of data. However, this will reduce performance because each record is directly submitted through JDBC.
Monitoring recommendations
- Monitor the backpressure of the Flink task to identify performance bottlenecks.
- Monitor the write performance metrics of OceanBase Database, such as TPS, QPS, and response time.
- Monitor the data skew of each parallel task to ensure load balancing.
