Overview
Apache Flink is an open-source distributed stream processing framework that is widely used in real-time and batch data processing scenarios. OceanBase Flink DirectLoad connector ( flink-connector-oceanbase-directload ) is a high-performance data import tool specifically designed for OceanBase Database. It leverages OceanBase Database's direct load technology to achieve high throughput and fast data import.
Direct load is a high-performance data import method provided by OceanBase Database. Unlike the traditional INSERT statement, direct load bypasses the SQL parsing and transaction processing and directly writes data to the storage layer, significantly improving import performance. This method is particularly suitable for scenarios requiring the one-time import of large volumes of data.
DirectLoad connector vs. JDBC connector
Feature |
DirectLoad connector |
JDBC connector |
|---|---|---|
| Write mode | Direct load | INSERT |
| Throughput | High | Moderate |
| Use case | Batch data import and migration | Real-time streaming and CDC synchronization |
| Data stream type | Bounded stream only | Unbounded stream |
| Table status during import | Locked, only queries are supported | Read and write are supported |
| Supported operations | INSERT, UPDATE_AFTER |
INSERT, UPDATE, DELETE |
Core advantages
- High throughput: Compared with the traditional JDBC method, the performance is improved by several or even dozens of times.
- Parallel writing: Supports parallel writing on multiple nodes to fully utilize cluster resources.
- Easy to use: You can use Flink SQL without complex configurations.
- Flexible conflict handling: Supports multiple primary key conflict handling strategies (stop, replace, ignore).
Usage notes and limitations
Before you use the DirectLoad connector, make sure that you understand the following features and limitations.
Supported features
- Bulk data writing: The connector is designed for importing large volumes of data, supporting fast writing of millions or even billions of records.
- Bounded stream processing: It supports bounded data sources, such as files and database snapshots.
- Multiple import modes: It supports
full(full import),inc(incremental import), andinc_replace(incremental replacement). - Flexible conflict resolution strategies: It provides three strategies for handling primary key conflicts:
STOP_ON_DUP,REPLACE, andIGNORE. - Parallel writing: It supports parallel writing with Flink, fully utilizing cluster resources.
- Compatibility: It is compatible with OceanBase Database in MySQL mode and Oracle mode.
Limitations
- Bounded streams only: The connector only supports bounded streams. Continuous unbounded stream writing is not supported.
- Table locking during import: During direct load, the target table is locked. Only
SELECTqueries are allowed, and noINSERT,UPDATE, orDELETEoperations can be performed on the table. - Unsupported operations: Only
INSERTandUPDATE_AFTERchange types are supported on the Flink Table/SQL side.DELETEandUPDATE_BEFOREoperations are not supported. - Batch processing only: Flink Batch mode is recommended for batch processing scenarios. Real-time streaming writing is not supported.
Notice
If you need to write or process unbounded data streams in real time, use the flink-connector-oceanbase connector provided in the project. This connector is based on JDBC and supports unbounded streams and real-time writing.
Scenarios
The DirectLoad connector is suitable for the following scenarios:
- Data migration: Migrating data from other databases or data warehouses to OceanBase Database in batches.
- Offline ETL: Periodically processing and importing data in batches, such as daily data aggregation.
- Importing historical data: Importing a large amount of historical data at once.
- Data initialization: Batch initializing data for a new system.
Scenarios not recommended
The DirectLoad connector is not recommended for the following scenarios:
- Real-time streaming writes: Scenarios requiring continuous data writes.
- CDC data synchronization: Scenarios requiring real-time synchronization of database changes.
- Low-latency requirements: Scenarios with strict requirements for write latency.
- High-frequency small-batch writes: Frequent small-batch data writes.
Software requirements and dependencies
Software |
Version requirement |
Description |
|---|---|---|
| Apache Flink | 1.15 or later | We recommend that you use 1.15 or later for better performance and stability. |
| JDK | 8 or later | — |
| OceanBase | One of the following versions: • 4.3.0 BP1 and later • 4.2.4 to 4.3.0 • 4.2.1 BP7 to 4.2.2.0 |
OceanBase Database supports MySQL mode and Oracle mode. |
| OceanBase (Incremental mode) | 4.3.2 or later | Required when you use the inc or inc_replace import mode. |
Quick Start
This section will guide you through the process of getting started with the DirectLoad connector, from obtaining the connector to completing your first data import.
Step 1: Quickly deploy a local Flink cluster (standalone mode)
Download Apache Flink
Go to the Flink official download page, and select Stable Release (recommended 1.15+ such as 1.18 or 1.19).
For example (Linux / macOS terminal command):
# Download Flink (example: 1.18.1)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# Unpack
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# Enter the directory
cd flink-1.19.3
# Set the environment variable (optional but recommended)
export FLINK_HOME=$(pwd)
Start the local Flink cluster
In the Flink directory, run the following command:
# Start the cluster (including JobManager + TaskManager)
$FLINK_HOME/bin/start-cluster.sh
After the cluster starts, you will see output similar to the following:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
If the cluster fails to start, check the logs:
# View the JobManager logs
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# View the TaskManager logs
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Verify that Flink is running
Open your browser and go to: http://localhost:8081. You will see the Flink Dashboard, which displays 1 TaskManager and at least 1 available slot.
Step 2: Deploy the OceanBase DirectLoad connector JAR
Download the connector JAR
Go to Maven Central: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/
Select the latest version (e.g., 1.5.0) and download the corresponding JAR file:
# Example: Download version 1.0.0
wget https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/1.0.0/flink-sql-connector-oceanbase-directload-1.5.0.jar
Note: The file name must be flink-sql-connector-oceanbase-directload-<version>.jar, which is crucial for Flink SQL to recognize the connector.
Place the JAR in the lib/ directory of Flink
# Copy the JAR to the Flink lib directory
cp flink-sql-connector-oceanbase-directload-1.5.0.jar $FLINK_HOME/lib/
Important Note: Flink automatically loads all JAR packages in the lib/ directory upon startup. Therefore, your connector will be automatically registered.
Restart the Flink cluster (to activate the JAR)
# Stop the cluster first
$FLINK_HOME/bin/stop-cluster.sh
# Then start it again
$FLINK_HOME/bin/start-cluster.sh
Step 3: Obtain database connection information
Contact the OceanBase Database deployment personnel or administrator to obtain the corresponding database connection string, for example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: The IP address for connecting to OceanBase Database. For connection through OceanBase Database Proxy (ODP), use the ODP address. For direct connection, use the IP address of an OBServer node.$port: The port for connecting to OceanBase Database. For ODP connection, the default port is2883, which can be customized during ODP deployment. For direct connection, the default port is2881, which can be customized during OceanBase Database deployment.$database_name: The name of the database to be accessed.Notice
The user for connecting to the tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege classification in MySQL mode.$user_name: The tenant connection account. For ODP connection, the common format isusername@tenant name#cluster nameorcluster name:tenant name:username. For direct connection, the format isusername@tenant name.$password: The account password.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Notice
When connecting to the OBServer service, you can query the system view DBA_OB_SERVERS in the sys tenant to obtain the RPC port number of the OBServer.
Confirm network connectivity
Ensure that the environment running the Flink job can access OceanBase Database:
Test command:
# Test RPC port connectivity
nc -zv <oceanbase-host> <rpc-port>
Step 4: Create the target table in OceanBase Database
Example:
-- Connect to OceanBase Database
USE test;
-- Create the target table
CREATE TABLE `t_user` (
`id` INT NOT NULL,
`username` VARCHAR(50) DEFAULT NULL,
`age` INT DEFAULT NULL,
`score` DECIMAL(10,2) DEFAULT NULL,
PRIMARY KEY (`id`)
) COMMENT 'User information table';
Step 5: Start the Flink SQL Client and test
# Start the SQL Client
$FLINK_HOME/bin/sql-client.sh
After the SQL Client is started, you will see the interactive command-line interface of the Flink SQL Client:
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
Step 6: Run a sample Flink SQL statement
Start the Flink SQL Client
cd $FLINK_HOME
./bin/sql-client.sh
Set the execution mode to BATCH
-- Optional: It is recommended to set the execution mode to BATCH for better performance.
SET 'execution.runtime-mode' = 'BATCH';
-- Optional: Set the parallelism (adjust based on data volume and cluster resources)
SET 'parallelism.default' = '4';
Create a DirectLoad sink table
Create a target table in Flink SQL that maps to the t_user table in OceanBase:
CREATE TABLE t_user (
id INT,
username STRING,
age INT,
score DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = 'xxx.x.x.x', -- The IP address of the OceanBase host.
'port' = 'xxxx', -- The RPC port.
'tenant-name' = 'your_tenant-name', -- The tenant name.
'username' = 'your_username', -- The username.
'password' = 'your_password', -- The password.
'schema-name' = 'test', -- The database name.
'table-name' = 't_user' -- The table name.
);
Parameter description:
connector: The value is oceanbase-directload.hostandport: The IP address and RPC port of the OceanBase host. To obtain the RPC port of the OBServer service endpoint, query theDBA_OB_SERVERSsystem view under the sys tenant.tenant-name: The tenant name.usernameandpassword: The database user credentials. Note that the username does not include the @tenant suffix.schema-nameandtable-name: The names of the target database and table.
Note
The above example uses the minimum set of parameters. For more parameter configurations, see -Configure parameters in the following sections.
Insert test data
-- Insert some test data.
INSERT INTO t_user
VALUES
(1, 'Alice', 25, 95.5),
(2, 'Bob', 30, 88.0),
(3, 'Charlie', 28, 92.3),
(4, 'David', 35, 87.8),
(5, 'Eve', 22, 96.0);
After the statement is executed, Flink will submit the job and start importing data. You will see the following output in the console:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Job execution details:
- The job runs in the background. You can view the job progress in the Flink Web UI (http://localhost:8081).
- The job status changes from
INITIALIZINGtoRUNNINGand finally toFINISHED. - The final commit of DirectLoad occurs during the job end phase. Make sure to wait for the job status to change to
FINISHEDbefore verifying the data.
Verify the data write
After the job is successfully completed, verify the data in OceanBase:
SELECT * FROM test.t_user ORDER BY id;
The query result is as follows:
+----+----------+------+-------+
| id | username | age | score |
+----+----------+------+-------+
| 1 | Alice | 25 | 95.50 |
| 2 | Bob | 30 | 88.00 |
| 3 | Charlie | 28 | 92.30 |
| 4 | David | 35 | 87.80 |
| 5 | Eve | 22 | 96.00 |
+----+----------+------+-------+
5 rows in set
Step 7: Stop and clean up (optional)
After the test is complete, you can stop the Flink cluster:
# Stop the Flink cluster
cd $FLINK_HOME
./bin/stop-cluster.sh
Exit the SQL Client:
-- Execute the following statement in the SQL Client:
QUIT;
Alternatively, you can press Ctrl+D to exit the SQL Client.
Important notes
- Wait for the job to complete: The final commit of DirectLoad occurs during the job end phase. Make sure to wait for the job status to change to
"FINISHED"before verifying the data. - Table lock during import: During the execution of the
INSERTstatement, the target tablet_useris locked. At this time, you can only query the table and cannot perform write operations. - Recommended execution mode: DirectLoad only supports bounded streams. We recommend that you use the BATCH execution mode and ensure that the data source is bounded.
- Error handling: If the job fails, check the error messages in the logs. Common issues include network connection failures, insufficient account permissions, and incorrect parameter configurations.
Configuration parameters
This section describes all the configuration parameters of the DirectLoad connector. Parameters marked with * are required.
Connection parameters
Parameter |
Required |
Default value |
Type |
Description |
|---|---|---|---|---|
connector* |
Yes | N/A | String | The type of the connector. Valid value: oceanbase-directload |
host* |
Yes | N/A | String | The IP address of the host where the OceanBase database is located, such as 127.0.0.1 or a domain name. |
port* |
Yes | 2882 |
Integer | The RPC port used for direct load. Default value: 2882. |
tenant-name* |
Yes | N/A | String | The name of the tenant, such as test. |
username* |
Yes | N/A | String | The username of the database, such as root. Notice: You must specify the username in the format of root, not the connection string format (such as root@test). |
password* |
Yes | N/A | String | The password of the database. |
schema-name* |
Yes | N/A | String | The name of the database schema, such as test. |
table-name* |
Yes | N/A | String | The name of the target table, such as t_user. |
Import behavior parameters
Parameter |
Required |
Default value |
Type |
Description |
|---|---|---|---|---|
load-method |
No | full |
String | The import mode. Valid values: full, inc, and inc_replace. For more information, see the section on load-method below. |
dup-action |
No | REPLACE |
String | The strategy for handling primary key conflicts. Valid values: - STOP_ON_DUP: Stop the import when a conflict occurs. - REPLACE: Replace the existing records. - IGNORE: Ignore the conflicting records. For more information, see the section on dup-action below. |
max-error-rows |
No | 0 |
Long | The maximum number of rows that can be imported with errors. If the number of rows with errors exceeds this threshold, the import fails. Setting this parameter to 0 indicates that no rows with errors can be imported. |
Performance optimization parameters
Parameter |
Required |
Default value |
Type |
Description |
|---|---|---|---|---|
parallel |
No | 8 |
Integer | The parallelism of the server. This parameter specifies how many CPU resources the OceanBase server uses to process the import task. For more information, see the section on parallel below. |
buffer-size |
No | 1024 |
Integer | The size of the write buffer (unit: number of rows). When the number of rows reaches this threshold, a flush write is triggered. For more information, see the optimization suggestions for buffer-size below. |
timeout |
No | 7d |
Duration | The timeout period of a single direct load task. Valid values: 1d, 12h, 30m, and 3600s. |
heartbeat-timeout |
No | 60s |
Duration | The timeout period of the client heartbeat. |
heartbeat-interval |
No | 10s |
Duration | The interval of the client heartbeat. |
Core parameters
load-method
The load-method parameter specifies the mode of direct load. Different modes are suitable for different scenarios.
full (full load)
- Default value. This mode is suitable for empty tables or tables with only a small amount of data.
- The imported data is directly written to the major sstable.
- Advantages of columnar tables: After the import is completed, the data is in columnar format, and the query performance is optimal. No additional compaction is required.
- Scenarios:
- Importing data to an empty table for the first time.
- The target table contains a small amount of data that can be rebuilt.
- Optimizing columnar query performance.
-- Example of the full mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'full',
...
);
inc (incremental load)
- This mode is suitable for appending data to a non-empty table.
- It checks for primary key conflicts and processes them according to the
dup-actionstrategy. - Version requirement: OceanBase Database 4.3.2 or later.
- Limitation: The
REPLACEoption is not supported for thedup-actionparameter. - Columnar tables: The data is written to a dump file (which is not supported for columnar tables), and the query performance is that of a row-based table. After a compaction, the query performance becomes optimal for columnar tables.
-- Example of the inc mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc',
'dup-action' = 'STOP_ON_DUP', -- The REPLACE option is not supported.
...
);
inc_replace (incremental replace)
- This mode is suitable for importing data to a non-empty table and overwriting existing records.
- It does not check for primary key conflicts and directly overwrites the data with the same primary key (equivalent to the
REPLACEoption). - Version requirement: OceanBase Database 4.3.2 or later.
- The
dup-actionparameter is ignored because no conflict checks are performed. - Columnar tables: The query performance is that of a row-based table until a compaction is performed.
-- Example of the inc_replace mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc_replace', -- The dup-action parameter is invalid.
...
);
Mode selection recommendation
Scenario |
Recommended mode |
Reason |
|---|---|---|
| Importing data to an empty table for the first time | full |
This mode provides the best performance and query results for columnar tables. |
| Importing data to a non-empty table, where conflicts may occur and existing records need to be overwritten | inc_replace |
This mode automatically overwrites existing data. |
| Importing data to a non-empty table, where conflicts are not allowed | inc + dup-action=STOP_ON_DUP |
This mode strictly controls data consistency. |
dup-action
The dup-action parameter specifies how to handle primary key conflicts (only applicable when load-method is set to full or inc).
STOP_ON_DUP (stop on duplicate)
- The import is immediately stopped when a primary key conflict occurs, and the entire job fails.
- Scenarios:
- Data consistency is strictly required.
- Primary key conflicts are not allowed.
- Data quality issues need to be quickly identified.
'dup-action' = 'STOP_ON_DUP'
REPLACE (replace)
- When a primary key conflict occurs, the new record replaces the old one.
- Scenarios:
- Existing data can be overwritten.
- The imported data is the latest version.
- Note: This option is not supported when
load-methodis set toinc.
'dup-action' = 'REPLACE'
IGNORE (ignore)
- When a primary key conflict occurs, the old record is retained, and the new record is ignored.
- Scenarios:
- The old data is considered the source of truth.
- Existing records should not be overwritten during incremental imports.
'dup-action' = 'IGNORE'
parallel
The parallel parameter specifies the server-side parallelism, which determines how many CPU resources the OceanBase Database server uses to process the current import task.
Key characteristics
- Independent from client concurrency: The
parallelparameter is a server-side parameter and is independent of the parallelism in Flink. - Limited by tenant configuration: The server automatically limits the maximum parallelism based on the CPU resources allocated to the tenant. Client settings that exceed this limit will not result in an error.
- Affected by partition distribution: The actual parallelism is also influenced by the distribution of partitions in the table.
Rules for calculating actual parallelism
- Maximum parallelism per node =
MIN(tenant CPU count × 2, parallel value) - Total actual parallelism =
Maximum parallelism per node × Number of nodes with partitions
Example 1: Single-node scenario
- Tenant configuration: 2 CPU cores
parallelvalue: 10- Partitions are distributed across 1 node
- Actual parallelism =
MIN(2 × 2, 10) × 1 = 4
Example 2: Multi-node scenario
- Tenant configuration: 2 CPU cores
parallelvalue: 10- Partitions are evenly distributed across 2 nodes
- Actual parallelism =
MIN(2 × 2, 10) × 2 = 8
Optimization recommendations
- For large-scale data imports, increasing the
parallelvalue can significantly reduce the commit phase duration. - We recommend setting the value based on the tenant's CPU configuration. A value that is too large is ineffective, while a value that is too small can impact performance.
- Generally, setting the value to 2–4 times the number of CPU cores is a reasonable starting point.
-- Example for a tenant with 4 CPU cores
'parallel' = '8' -- or '16'
buffer-size optimization recommendations
The buffer-size parameter specifies the size of the client-side write buffer in terms of number of rows. The buffer is flushed and written to the server when it reaches this number of rows.
Optimization recommendations
- Large data volume with small row size: Increase the value (e.g., 2048 or 4096) to reduce the number of flushes and improve throughput.
- Large row size: Avoid setting a large value to prevent memory pressure. We recommend using the default value or a smaller value.
- Memory pressure: Reduce the
buffer-sizevalue to prevent the TaskManager from running out of memory (OOM).
Recommended values
Scenario |
Recommended value |
|---|---|
| Row size < 1 KB and sufficient memory | 2048 – 4096 |
| Row size 1 KB – 10 KB | 1024 (default) |
| Row size > 10 KB or memory pressure | 512 |
-- Example
'buffer-size' = '2048'
Examples
This section provides more complete and practical examples for different scenarios and configurations.
Example 1: Basic batch import
This is the most basic example, suitable for beginners to get started quickly.
-- 1. Set to Batch mode
SET 'execution.runtime-mode' = 'BATCH';
-- 2. Create a sink table (using the minimal parameter set)
CREATE TABLE orders_sink (
order_id BIGINT,
user_id BIGINT,
product_name STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'test',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'test',
'table-name' = 'orders'
);
-- 3. Import data from the source (assuming data is read from another table)
INSERT INTO orders_sink
SELECT order_id, user_id, product_name, amount, order_time
FROM orders_source;
Example 2: Full import to an empty table (full mode)
This example is suitable for importing large amounts of data to an empty table for the first time.
SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '8'; -- Adjust based on the data volume
CREATE TABLE user_profile_sink (
user_id BIGINT,
username STRING,
email STRING,
age INT,
city STRING,
register_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'prod',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'userdb',
'table-name' = 'user_profile',
-- Use full mode (suitable for empty tables)
'load-method' = 'full',
-- Performance tuning
'parallel' = '16', -- Server-side parallelism
'buffer-size' = '2048'
);
-- Import data from a CSV file or other data source
INSERT INTO user_profile_sink
SELECT * FROM user_profile_csv_source;
Example 3: Incremental replacement import (inc_replace mode)
This example is suitable for scenarios where new data needs to overwrite existing data.
SET 'execution.runtime-mode' = 'BATCH';
CREATE TABLE product_info_sink (
product_id BIGINT,
product_name STRING,
price DECIMAL(10, 2),
stock INT,
update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'ecommerce',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'productdb',
'table-name' = 'product_info',
-- Use inc_replace mode (automatically overwrites existing data)
'load-method' = 'inc_replace', -- The dup-action parameter is ignored
-- Performance configuration
'parallel' = '16',
'buffer-size' = '2048'
);
-- Import updated data (automatically overwrites old records)
INSERT INTO product_info_sink
SELECT * FROM product_info_latest;
Example 4: Read bounded data from Kafka and import
This example demonstrates how to read bounded data from Kafka (using bounded mode) and import it to OceanBase.
SET 'execution.runtime-mode' = 'BATCH';
-- Create a Kafka source (bounded mode)
CREATE TABLE kafka_source (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-import-group',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset', -- Bounded mode: stops after reading the latest offset
'format' = 'json'
);
-- Create an OceanBase sink
CREATE TABLE event_sink (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'analytics',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'eventdb',
'table-name' = 'events',
'load-method' = 'full',
'parallel' = '16'
);
-- Import data
INSERT INTO event_sink
SELECT * FROM kafka_source;
Example 5: Import data from a file
This example shows how to import data from a CSV file to OceanBase Database.
SET 'execution.runtime-mode' = 'BATCH';
-- Create the source table.
CREATE TABLE csv_source (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/employees.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'false'
);
-- Create the OceanBase sink table.
CREATE TABLE employee_sink (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '192.168.1.100',
'port' = '2882',
'tenant-name' = 'hrms',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'hr',
'table-name' = 'employees',
'load-method' = 'full',
'parallel' = '8',
'buffer-size' = '2048',
'max-error-rows' = '10' -- Maximum number of error rows allowed
);
-- Execute the import.
INSERT INTO employee_sink
SELECT * FROM csv_source;
Best practices for using the DirectLoad connector
Performance optimization
Set the Flink parallelism appropriately
The Flink parallelism determines the number of parallel Writer tasks. Setting it appropriately can fully utilize the cluster resources.
-- Set the parallelism based on the data volume and cluster resources.
SET 'parallelism.default' = '8';
Tune the parallel parameter (server-side parallelism)
The parallel parameter significantly affects the performance of the commit phase.
Tuning strategy:
- When importing a large amount of data, increasing the
parallelparameter can significantly reduce the commit phase time. - We recommend that you set the
parallelparameter to 2–4 times the number of CPU cores of the tenant. - You do not need to worry about setting a large value. The server automatically limits the value.
-- Example: 8-CPU-core tenant
'parallel' = '16' -- or '32'
Performance comparison
parallel |
Commit phase time |
|---|---|
| 8 (default) | About 10 minutes |
| 16 | About 5 minutes |
| 32 | About 3 minutes |
Tune the buffer-size
Adjust the buffer size based on the data characteristics:
-- Small rows (each row is less than 1 KB)
'buffer-size' = '4096'
-- Medium rows (each row is 1–10 KB)
'buffer-size' = '1024' -- Default value
-- Large rows (each row is more than 10 KB)
'buffer-size' = '512'
Recommendations for production environments
Select an appropriate import window
During a direct load, the table is locked. We recommend that you:
- Select a low-traffic period: such as midnight or weekends.
- Notify the relevant parties in advance to avoid affecting other business.
- Set a reasonable timeout to prevent the import task from occupying the table for a long time.
-- Set a 2-hour timeout (based on the data volume).
'timeout' = '2h'
Select an appropriate load-method based on the scenario
Scenario |
Recommended load-method |
Reason |
|---|---|---|
| First-time import to an empty table | full |
It provides the best performance and columnar query results. |
| Periodically append incremental data | inc |
It is suitable for incremental scenarios and supports conflict checking. |
| Daily full update of dimension tables | inc_replace |
It automatically overwrites old data without manual deletion. |
| Historical data rollback | full (import to a temporary table and switch) |
It avoids affecting online tables. |
Recommendations for columnar tables
If the target is a columnar table, note the impact of different load-method values:
fullmode (recommended):- Data is directly written in the columnar format.
- Query performance is optimal after the import.
- It is suitable only for empty tables or tables that can be rebuilt.
inc/inc_replacemode:- Data is written in the row-based format.
- Query performance is the same as that of a row-based table.
- Columnar performance is achieved only after a major compaction.
- If you prioritize high query performance, we recommend that you manually trigger a major compaction:
-- Manually trigger a major compaction in OceanBase Database.
ALTER SYSTEM MAJOR FREEZE;
FAQ
Q1: What do I do if other write operations fail during import?
Problem description: During a direct load, INSERT, UPDATE, or DELETE operations on the target table fail.
Cause: This is a characteristic of direct load. During the import, the target table is locked and only SELECT operations are allowed.
Solution:
Solution 1: Schedule the import during off-peak hours
- Schedule the import task during off-peak hours, such as during the early morning.
- Communicate with the business team in advance and pause write operations.
Solution 2: Use a temporary table
-- 1. Import to a temporary table CREATE TABLE target_table_tmp LIKE target_table; -- Use DirectLoad to import to target_table_tmp -- 2. Switch the table names RENAME TABLE target_table TO target_table_old, target_table_tmp TO target_table;- First, import the data to a temporary table.
- After the import is complete, switch the table names or merge the data.
Solution 3: Use a regular JDBC connector
- If you cannot tolerate table locking, use the
flink-connector-oceanbaseconnector. - This will sacrifice some performance for table availability.
- If you cannot tolerate table locking, use the
Q2: What do I do if the job does not end or the data is not committed?
Problem description: The Flink job remains in the RUNNING state, and no data is found in OceanBase.
Cause: The final commit in the DirectLoad connector occurs at the end-of-input stage. If the input is an unbounded stream, the job will not end, and the data will not be committed.
Troubleshooting steps:
Check if Batch mode is enabled:
SET 'execution.runtime-mode' = 'BATCH';Check if the data source is a bounded stream:
- File data sources are naturally bounded.
- Kafka requires the
scan.bounded.modesetting. - JDBC data sources are naturally bounded.
- Custom sources require checking if the end-of-input signal is correctly sent.
Check the Flink Web UI:
- Check the job status.
- Look for backpressure.
- Check the progress of each operator.
Q3: How do I choose the load-method?
Decision tree:
Is this the first import to an empty table?
├─ Yes → Use full mode (best performance and columnar storage)
└─ No (importing to a non-empty table)
└─ Do you need to overwrite existing records?
├─ Yes → Use inc_replace mode
└─ No → Use inc mode
└─ Do you allow primary key conflicts?
├─ No → dup-action = STOP_ON_DUP
├─ Yes (keep new data) → dup-action = REPLACE
└─ Yes (keep old data) → dup-action = IGNORE
Special scenarios:
- For optimal query performance in a columnar storage table: Use
fullmode. - For OceanBase Database versions earlier than 4.3.2: Only
fullmode is supported. - For frequent incremental imports: Use
incorinc_replacemode.
Q4: What do I do if queries are slow after using inc/inc_replace mode on a columnar storage table?
Problem description: After using inc or inc_replace mode to import data into a columnar storage table, the query performance is lower than expected.
Cause: Data written in inc/inc_replace mode is stored in row format. It needs to be compacted into columnar format through a major compaction.
Solution:
Manually trigger a major compaction (recommended):
-- Execute the following statement in OceanBase Database: ALTER SYSTEM MAJOR FREEZE; -- Check the compaction progress SELECT * FROM oceanbase.DBA_OB_MAJOR_COMPACTION;Wait for automatic compactions: OceanBase Database automatically triggers compactions periodically, but it may take a long time depending on the configuration.
Q5: How do I optimize the import speed?
Possible causes and solutions:
Low Flink parallelism:
SET 'parallelism.default' = '16'; -- Increase parallelismLow server-side parallelism:
'parallel' = '16' -- Increase server-side parallelismSmall buffer size:
'buffer-size' = '2048' -- Increase the buffer sizeInsufficient resources in OceanBase Database:
- Check the load on the OceanBase cluster.
- Consider scaling up or importing during off-peak hours.
