Overview
Apache Flink is an open-source distributed stream processing framework widely used in real-time and batch data processing scenarios. OceanBase Flink DirectLoad Connector (flink-connector-oceanbase-directload ) is a high-performance data import tool specifically designed for OceanBase Database. It leverages OceanBase Database's direct load technology to achieve high throughput and rapid data ingestion into the database.
Direct load is a high-performance data import method provided by OceanBase Database. Unlike the traditional INSERT statement, direct load bypasses the SQL parsing and transaction processing steps, directly writing data to the storage layer. This results in significantly higher import performance. This method is particularly suitable for scenarios requiring the one-time import of large volumes of data.
DirectLoad connector vs. JDBC connector
| Feature | DirectLoad connector | JDBC connector |
|---|---|---|
| Write method | Direct import and write to the storage layer | Write by using the INSERT statement |
| Throughput | High, suitable for large-scale data | Relatively low |
| Use case | Batch data import and data migration | Real-time streaming write and CDC synchronization |
| Data stream type | Supports only bounded streams | Supports unbounded streams |
| Table status during import | The target table is locked and only supports queries | The table can be read and written |
| Supported operations | INSERT and UPDATE_AFTER |
INSERT, UPDATE, and DELETE |
Core advantages
- High throughput: Compared with the traditional JDBC method, the throughput is improved by several times or even dozens of times.
- Parallel write: Supports parallel write in multiple nodes to fully utilize cluster resources.
- Easy to use: You can use it by using Flink SQL without complex configuration.
- Flexible conflict handling: Supports multiple primary key conflict handling strategies (stop, replace, and ignore).
Considerations and limitations
Before you use the DirectLoad connector, make sure that you understand the following features and limitations.
Supported features
- Batch data writing: This connector is designed for large-scale data import and supports fast writing of tens of millions to hundreds of millions of data records.
- Bounded stream processing: This connector supports bounded data sources, such as files and database snapshots.
- Multiple import modes: This connector supports the
full,inc, andinc_replacemodes. - Flexible conflict resolution strategies: This connector provides three strategies for handling primary key conflicts:
STOP_ON_DUP,REPLACE, andIGNORE. - Parallel writing: This connector supports parallel writing in Flink, which fully utilizes cluster resources.
- Compatibility: This connector is compatible with OceanBase Database in MySQL-compatible mode and Oracle-compatible mode.
Limitations
- Bounded stream only: This connector supports only bounded streams. It does not support continuous writing of unbounded streams.
- Table locking during import: During direct load, the target table is locked. Only
SELECTqueries are allowed on the table.INSERT,UPDATE, andDELETEoperations are not allowed. - Unsupported operations: This connector supports only the
INSERTandUPDATE_AFTERchange types. It does not support theDELETEandUPDATE_BEFOREchange types. - Batch processing only: This connector is suitable for batch processing. It is not suitable for real-time stream processing.
Notice
If you need to write or process unbounded data streams in real time, use the flink-connector-oceanbase connector provided in the project. It is based on JDBC and supports unbounded streams and real-time writing.
Scenarios
The DirectLoad connector is suitable for the following scenarios:
- Data migration: Migrate data from other databases or data warehouses to OceanBase Database in batches.
- Offline ETL: Periodically process and import data in batches, such as daily data aggregation.
- Import of historical data: Import a large amount of historical data in one go.
- Data initialization: Batch initialize data for a new system.
Scenarios not recommended
The DirectLoad connector is not recommended for the following scenarios:
- Real-time streaming writes: Real-time scenarios that require continuous data writes.
- CDC data synchronization: Scenarios that require real-time synchronization of database changes.
- Low-latency requirements: Scenarios with strict requirements on write latency.
- Frequent small-batch writes: Frequent writes of small batches of data.
Version requirements and dependencies
| Software | Version requirement | Description |
|---|---|---|
| Apache Flink | 1.15 or later | We recommend that you use 1.15+ for better performance and stability. |
| JDK | 8 or later | — |
| OceanBase Database | One of the following versions: • 4.3.0 BP1 and later • 4.2.4 to 4.3.0 • 4.2.1 BP7 to 4.2.2.0 |
Supports MySQL and Oracle modes. |
| OceanBase Database (Incremental mode) | 4.3.2 or later | Required when you use the inc or inc_replace import mode. |
Quick Start
This section will guide you through the process of getting started with the DirectLoad connector, from obtaining the connector to completing your first data import.
Step 1: Deploy a local Flink cluster (single-node mode)
Download Apache Flink
Go to the Flink official download page and select Stable Release (recommended 1.15+ such as 1.18 or 1.19).
Example (Linux/macOS terminal commands):
# Download Flink (example: 1.18.1)
wget https://archive.apache.org/dist/flink/flink-1.19.3/flink-1.19.3-bin-scala_2.12.tgz
# Extract the package
tar -xzf flink-1.19.3-bin-scala_2.12.tgz
# Navigate to the directory
cd flink-1.19.3
# Set the environment variable (optional but recommended)
export FLINK_HOME=$(pwd)
Start the local Flink cluster
Run the following command in the Flink directory:
# Start the cluster (JobManager + TaskManager)
$FLINK_HOME/bin/start-cluster.sh
If the cluster starts successfully, you will see output similar to the following:
Starting cluster.
Starting standalonesession daemon on host your-hostname.
Starting taskexecutor daemon on host your-hostname.
If the cluster fails to start, check the logs:
# View JobManager logs
tail -f $FLINK_HOME/log/flink-*-standalonesession-*.log
# View TaskManager logs
tail -f $FLINK_HOME/log/flink-*-taskexecutor-*.log
Verify that Flink is running
Open a browser and go to: http://localhost:8081. You should see the Flink Dashboard, which displays 1 TaskManager with available slots ≥ 1.
Step 2: Deploy the OceanBase DirectLoad connector JAR
Download the connector JAR
Go to Maven Central: https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/
Select a version (e.g., 1.5.0) and download the corresponding JAR file:
# Example: Download version 1.5.0
wget https://repo1.maven.org/maven2/com/oceanbase/flink-sql-connector-oceanbase-directload/1.0.0/flink-sql-connector-oceanbase-directload-1.5.0.jar
Note: The filename must be flink-sql-connector-oceanbase-directload-<version>.jar, which is crucial for Flink SQL to recognize the connector.
Place the JAR in the lib/ directory of Flink
# Copy the JAR to the Flink lib directory
cp flink-sql-connector-oceanbase-directload-1.5.0.jar $FLINK_HOME/lib/
Important Note: Flink automatically loads all JAR files in the lib/ directory upon startup, so your connector will be automatically registered.
Restart the Flink cluster (to apply the JAR)
# Stop the cluster
$FLINK_HOME/bin/stop-cluster.sh
# Start the cluster
$FLINK_HOME/bin/start-cluster.sh
Step 3: Obtain the database connection information
Contact the OceanBase Database deployment personnel or administrator to obtain the database connection string, for example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: The IP address for connecting to OceanBase Database. For connection through OceanBase Database Proxy (ODP), use the ODP address. For direct connection, use the IP address of an OBServer node.$port: The port for connecting to OceanBase Database. For ODP connection, the default port is2883, which can be customized during ODP deployment. For direct connection, the default port is2881, which can be customized during OceanBase Database deployment.$database_name: The name of the database to be accessed.Notice
The user for connecting to the tenant must have the
CREATE,INSERT,DROP, andSELECTprivileges on the database. For more information about user privileges, see Privilege classification in MySQL-compatible mode.$user_name: The tenant connection account. For ODP connection, the common format isusername@tenant name#cluster nameorcluster name:tenant name:username. For direct connection, the format isusername@tenant name.$password: The account password.
For more information about the connection string, see Connect to an OceanBase tenant by using OBClient.
Notice
When connecting to the OBServer service, you can query the system view DBA_OB_SERVERS under the sys tenant to obtain the RPC port number of the OBServer.
Verify network connectivity
Ensure that the environment where the Flink job is running can access OceanBase Database:
Test command:
# Test RPC port connectivity
nc -zv <oceanbase-host> <rpc-port>
Step 4: Create a target table in OceanBase Database
Example:
-- Connect to OceanBase Database
USE test;
-- Create the target table
CREATE TABLE `t_user` (
`id` INT NOT NULL,
`username` VARCHAR(50) DEFAULT NULL,
`age` INT DEFAULT NULL,
`score` DECIMAL(10,2) DEFAULT NULL,
PRIMARY KEY (`id`)
) COMMENT 'User information table';
Step 5: Start the Flink SQL Client and test it
# Start the SQL Client
$FLINK_HOME/bin/sql-client.sh
After the SQL Client is started, you will see the interactive command-line interface of the Flink SQL Client:
▒▓██▓██▒
▓████▒▒█▓▒▓███▓▒
▓███▓░░ ▒▒▒▓██▒ ▒
░██▒ ▒▒▓▓█▓▓▒░ ▒████
██▒ ░▒▓███▒ ▒█▒█▒
░▓█ ███ ▓░▒██
▓█ ▒▒▒▒▒▓██▓░▒░▓▓█
█░ █ ▒▒░ ███▓▓█ ▒█▒▒▒
████░ ▒▓█▓ ██▒▒▒ ▓███▒
░▒█▓▓██ ▓█▒ ▓█▒▓██▓ ░█░
▓░▒▓████▒ ██ ▒█ █▓░▒█▒░▒█▒
███▓░██▓ ▓█ █ █▓ ▒▓█▓▓█▒
░██▓ ░█░ █ █▒ ▒█████▓▒ ██▓░▒
███░ ░ █░ ▓ ░█ █████▒░░ ░█░▓ ▓░
██▓█ ▒▒▓▒ ▓███████▓░ ▒█▒ ▒▓ ▓██▓
▒██▓ ▓█ █▓█ ░▒█████▓▓▒░ ██▒▒ █ ▒ ▓█▒
▓█▓ ▓█ ██▓ ░▓▓▓▓▓▓▓▒ ▒██▓ ░█▒
▓█ █ ▓███▓▒░ ░▓▓▓███▓ ░▒░ ▓█
██▓ ██▒ ░▒▓▓███▓▓▓▓▓██████▓▒ ▓███ █
▓███▒ ███ ░▓▓▒░░ ░▓████▓░ ░▒▓▒ █▓
█▓▒▒▓▓██ ░▒▒░░░▒▒▒▒▓██▓░ █▓
██ ▓░▒█ ▓▓▓▓▒░░ ▒█▓ ▒▓▓██▓ ▓▒ ▒▒▓
▓█▓ ▓▒█ █▓░ ░▒▓▓██▒ ░▓█▒ ▒▒▒░▒▒▓█████▒
██░ ▓█▒█▒ ▒▓▓▒ ▓█ █░ ░░░░ ░█▒
▓█ ▒█▓ ░ █░ ▒█ █▓
█▓ ██ █░ ▓▓ ▒█▓▓▓▒█░
█▓ ░▓██░ ▓▒ ▓█▓▒░░░▒▓█░ ▒█
██ ▓█▓░ ▒ ░▒█▒██▒ ▓▓
▓█▒ ▒█▓▒░ ▒▒ █▒█▓▒▒░░▒██
░██▒ ▒▓▓▒ ▓██▓▒█▒ ░▓▓▓▓▒█▓
░▓██▒ ▓░ ▒█▓█ ░░▒▒▒
▒▓▓▓▓▓▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒▒░░▓▓ ▓░▒█░
______ _ _ _ _____ ____ _ _____ _ _ _ BETA
| ____| (_) | | / ____|/ __ \| | / ____| (_) | |
| |__ | |_ _ __ | | __ | (___ | | | | | | | | |_ ___ _ __ | |_
| __| | | | '_ \| |/ / \___ \| | | | | | | | | |/ _ \ '_ \| __|
| | | | | | | | < ____) | |__| | |____ | |____| | | __/ | | | |_
|_| |_|_|_| |_|_|\_\ |_____/ \___\_\______| \_____|_|_|\___|_| |_|\__|
Welcome! Enter 'HELP;' to list all available commands. 'QUIT;' to exit.
Flink SQL>
Step 6: Run a sample Flink SQL statement
Start the Flink SQL Client
cd $FLINK_HOME
./bin/sql-client.sh
Set the running mode to BATCH
-- Optional: It is recommended to set the running mode to BATCH for better performance.
SET 'execution.runtime-mode' = 'BATCH';
-- Optional: Set the parallelism (adjust the parallelism based on the data volume and cluster resources)
SET 'parallelism.default' = '4';
Create a DirectLoad sink table
Create a target table in Flink SQL that maps to the t_user table in OceanBase Database:
CREATE TABLE t_user (
id INT,
username STRING,
age INT,
score DECIMAL(10,2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = 'xxx.x.x.x', -- The IP address of the OceanBase Database server.
'port' = 'xxxx', -- The RPC port number.
'tenant-name' = 'your_tenant-name', -- The name of the tenant.
'username' = 'your_username', -- The username.
'password' = 'your_password', -- The password.
'schema-name' = 'test', -- The name of the database.
'table-name' = 't_user' -- The name of the table.
);
Parameter description:
connector: The value is oceanbase-directload.hostandport: The IP address and RPC port number of the OceanBase Database server. You can query the system viewDBA_OB_SERVERSin the sys tenant to obtain the RPC port number of the OBServer node.tenant-name: The name of the tenant.usernameandpassword: The username and password of the database user. Note that the username does not contain the @tenant suffix.schema-nameandtable-name: The name of the target database and table.
Note
The preceding example uses the minimum set of parameters. For more parameters, see -Configure parameters in the following sections.
Insert test data
-- Insert some test data.
INSERT INTO t_user
VALUES
(1, 'Alice', 25, 95.5),
(2, 'Bob', 30, 88.0),
(3, 'Charlie', 28, 92.3),
(4, 'David', 35, 87.8),
(5, 'Eve', 22, 96.0);
After the preceding statements are executed, Flink submits the job and starts importing data. You will see the following output in the console:
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
Job execution:
- The job runs in the background. You can view the job progress in the Flink Web UI (http://localhost:8081).
- The job status changes from
INITIALIZINGtoRUNNINGand then toFINISHED. - The final commit of the DirectLoad occurs at the end of the job. Make sure to wait until the job status changes to
FINISHEDbefore verifying the data.
Verify the data write
After the job is completed, query the t_user table in OceanBase Database to verify the data:
SELECT * FROM test.t_user ORDER BY id;
The query result is as follows:
+----+----------+------+-------+
| id | username | age | score |
+----+----------+------+-------+
| 1 | Alice | 25 | 95.50 |
| 2 | Bob | 30 | 88.00 |
| 3 | Charlie | 28 | 92.30 |
| 4 | David | 35 | 87.80 |
| 5 | Eve | 22 | 96.00 |
+----+----------+------+-------+
5 rows in set
Step 7: Stop and clean up (optional)
After the test, you can stop the Flink cluster:
# Stop the Flink cluster.
cd $FLINK_HOME
./bin/stop-cluster.sh
Exit the SQL Client:
-- Execute the following statement in the SQL Client.
QUIT;
You can also press Ctrl+D to exit the SQL Client.
Important notes
- Wait for the job to complete: The final commit of the direct load occurs at the end of the job. Make sure to wait until the job status changes to
"FINISHED"before verifying the data. - Table lock during import: During the execution of the
INSERTstatement, the target tablet_useris locked. You can only query the table but cannot perform write operations. - Use the BATCH mode: The DirectLoad connector only supports bounded streams. We recommend that you use the BATCH mode and ensure that the data source is bounded.
- Error handling: If the job fails, check the error information in the logs. Common issues include network connection failures, insufficient account permissions, and incorrect parameter configurations.
Parameters
This section describes all the parameters of the DirectLoad connector. Parameters marked with * are required.
Connection parameters
| Parameter | Required | Default value | Type | Description |
|---|---|---|---|---|
connector* |
Yes | N/A | String | The type of the connector. Valid value: oceanbase-directload. |
host* |
Yes | N/A | String | The IP address of the host where the OceanBase database is located. For example, 127.0.0.1 or a domain name. |
port* |
Yes | 2882 |
Integer | The RPC port used for direct load. Default value: 2882. |
tenant-name* |
Yes | N/A | String | The name of the tenant. For example, test. |
username* |
Yes | N/A | String | The username of the database. For example, root. Notice: Enter only the username, not the connection string (such as root@test). |
password* |
Yes | N/A | String | The password of the database. |
schema-name* |
Yes | N/A | String | The name of the database (Schema). For example, test. |
table-name* |
Yes | N/A | String | The name of the target table. For example, t_user. |
Import behavior parameters
| Parameter | Required | Default value | Type | Description |
|---|---|---|---|---|
load-method |
No | full |
String | The import mode. Valid values: full, inc, and inc_replace. For more information, see the section on load-method below. |
dup-action |
No | REPLACE |
String | The strategy for handling primary key conflicts. Valid values: - STOP_ON_DUP: stop the import when a conflict occurs. - REPLACE: replace existing records. - IGNORE: ignore conflicting records. For more information, see the section on dup-action below. |
max-error-rows |
No | 0 |
Long | The maximum number of error rows that can be tolerated. If this threshold is exceeded, the import fails. Setting this parameter to 0 indicates that no errors are tolerated. |
Performance tuning parameters
| Parameter | Required | Default value | Type | Description |
|---|---|---|---|---|
parallel |
No | 8 |
Integer | The parallelism on the server side. This parameter specifies how many CPU resources the OceanBase server uses to process the import task. For more information, see the section on parallel below. |
buffer-size |
No | 1024 |
Integer | The size of the write buffer (unit: number of rows). When this number of rows is accumulated, a flush write is triggered. For more information, see the tuning suggestions for buffer-size below. |
timeout |
No | 7d |
Duration | The timeout period for a single direct load task. Valid values: 1d, 12h, 30m, and 3600s. |
heartbeat-timeout |
No | 60s |
Duration | The timeout period for client heartbeats. |
heartbeat-interval |
No | 10s |
Duration | The interval for client heartbeats. |
Core parameters
load-method
The load-method parameter specifies the mode of direct load. Different modes are suitable for different scenarios.
full (full load)
- Default value. This mode is suitable for empty tables or tables with only a small amount of data.
- The data is directly written to the major sstable.
- Advantages of columnar tables: After the data is imported, it is in columnar format, and the query performance is optimal. No additional compaction is required.
- Scenarios:
- Import data to an empty table for the first time.
- The target table contains a small amount of data that can be rebuilt.
- Optimize the query performance of columnar tables.
-- Example of full mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'full',
...
);
inc (incremental load)
- This mode is suitable for appending data to a non-empty table.
- It checks for primary key conflicts and handles them according to the
dup-actionstrategy. - Version requirement: OceanBase Database 4.3.2 or later.
- Limitation: The
REPLACEvalue is not supported for thedup-actionparameter. - Columnar tables: The data is written to a rowstore table (columnar tables are not supported for direct load). After the data is imported, the query performance is that of a rowstore table. You need to wait for a compaction to achieve the optimal query performance of columnar tables.
-- Example of incremental mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc',
'dup-action' = 'STOP_ON_DUP', -- The REPLACE value is not supported.
...
);
inc_replace (incremental replacement load)
- This mode is suitable for importing data to a non-empty table and overwriting existing records.
- It does not check for primary key conflicts and directly overwrites the data with the same primary key (equivalent to the
REPLACEeffect). - Version requirement: OceanBase Database 4.3.2 or later.
- The
dup-actionparameter is ignored because no conflict check is performed. - Columnar tables: The query performance is that of a rowstore table until a compaction is performed.
-- Example of incremental replacement mode
CREATE TABLE t_sink (...) WITH (
'connector' = 'oceanbase-directload',
'load-method' = 'inc_replace', -- The dup-action parameter is invalid.
...
);
Recommendations
| Scenario | Recommended mode | Reason |
|---|---|---|
| Import data to an empty table for the first time | full |
This mode provides the optimal performance and query performance of columnar tables. |
| Import data to a non-empty table, and there are conflicts that need to be overwritten | inc_replace |
This mode automatically overwrites the old data. |
| Import data to a non-empty table, and no conflicts are allowed | inc + dup-action=STOP_ON_DUP |
This mode strictly controls data consistency. |
dup-action
The dup-action parameter specifies how to handle primary key conflicts (only effective when load-method is set to full or inc).
STOP_ON_DUP (stop import)
- The import is immediately stopped when a primary key conflict occurs, and the entire job fails.
- Scenarios:
- Data consistency is strictly required.
- Primary key conflicts are not allowed.
- Data quality issues need to be quickly identified.
'dup-action' = 'STOP_ON_DUP'
REPLACE (replace)
- The new record replaces the old record when a primary key conflict occurs.
- Scenarios:
- Existing data can be overwritten.
- The imported data is the latest version.
- Note: This mode is not supported when
load-methodis set toinc.
'dup-action' = 'REPLACE'
IGNORE (ignore)
- The old record is retained and the new record is ignored when a primary key conflict occurs.
- Scenarios:
- Old data is considered to be accurate.
- Existing records should not be overwritten during incremental import.
'dup-action' = 'IGNORE'
parallel
The parallel parameter specifies the server-side parallelism, which determines how many CPU resources are used by the OceanBase Database server to process the current import task.
Key characteristics
- Independent from client concurrency: The
parallelparameter is a server-side parameter and is independent of the parallelism of Flink. - Limited by tenant configuration: The server automatically limits the maximum parallelism based on the CPU resources of the tenant. If the client sets a value exceeding the limit, no error is returned.
- Affected by partition distribution: The actual parallelism is also affected by the distribution of partitions in the table.
Rules for calculating actual parallelism
- Maximum parallelism per node =
MIN(number of CPU cores of the tenant × 2, parallel value) - Total actual parallelism =
Maximum parallelism per node × Number of nodes with partitions
Example 1: Single-node scenario
- Tenant configuration: 2 CPU cores
parallelvalue: 10- The table partitions are distributed on 1 node.
- Actual parallelism =
MIN(2 × 2, 10) × 1 = 4
Example 2: Multi-node scenario
- Tenant configuration: 2 CPU cores
parallelvalue: 10- The table partitions are evenly distributed on 2 nodes.
- Actual parallelism =
MIN(2 × 2, 10) × 2 = 8
Optimization recommendations
- For large-scale data import, increasing the
parallelvalue can significantly reduce the commit time. - We recommend that you set a reasonable value based on the CPU resources of the tenant. A value that is too large is ineffective, and a value that is too small affects the performance.
- Generally, setting the value to 2–4 times the number of CPU cores of the tenant is a reasonable starting point.
-- Example: Recommended value for a 4-CPU-tenant
'parallel' = '8' -- or '16'
buffer-size
The buffer-size parameter specifies the size of the client-side write buffer, in number of rows. The buffer is flushed and written to the server after it reaches the specified number of rows.
Optimization recommendations
- Large data volume and small row size: You can increase the value (such as 2048 or 4096) to reduce the number of flushes and improve throughput.
- Large row size: Avoid setting a large value to prevent memory pressure. We recommend that you keep the value as the default or reduce it slightly.
- Memory pressure: Reduce the
buffer-sizevalue to avoid out-of-memory (OOM) errors in the TaskManager.
Recommended values
| Scenario | Recommended value |
|---|---|
| Row size < 1 KB and memory is sufficient | 2048 – 4096 |
| Row size 1 KB – 10 KB | 1024 (default value) |
| Row size > 10 KB or memory is insufficient | 512 |
-- Example
'buffer-size' = '2048'
Examples
This section provides more complete and practical examples in different scenarios and configurations.
Example 1: Basic batch import
This is the most basic example, suitable for beginners to get started quickly.
-- 1. Set the execution mode to BATCH.
SET 'execution.runtime-mode' = 'BATCH';
-- 2. Create a sink table (using the minimum parameter set).
CREATE TABLE orders_sink (
order_id BIGINT,
user_id BIGINT,
product_name STRING,
amount DECIMAL(10, 2),
order_time TIMESTAMP(3),
PRIMARY KEY (order_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'test',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'test',
'table-name' = 'orders'
);
-- 3. Import data from the source (assuming it's another table).
INSERT INTO orders_sink
SELECT order_id, user_id, product_name, amount, order_time
FROM orders_source;
Example 2: Full import to an empty table (full mode)
This example is suitable for importing large amounts of data to an empty table for the first time.
SET 'execution.runtime-mode' = 'BATCH';
SET 'parallelism.default' = '8'; -- Adjust based on the data volume
CREATE TABLE user_profile_sink (
user_id BIGINT,
username STRING,
email STRING,
age INT,
city STRING,
register_time TIMESTAMP(3),
PRIMARY KEY (user_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'prod',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'userdb',
'table-name' = 'user_profile',
-- Use the full mode (suitable for empty tables)
'load-method' = 'full',
-- Performance tuning
'parallel' = '16', -- Server-side parallelism
'buffer-size' = '2048'
);
-- Import data from a CSV file or other data source
INSERT INTO user_profile_sink
SELECT * FROM user_profile_csv_source;
Example 3: Incremental replacement import (inc_replace mode)
This example is suitable for scenarios where new data needs to overwrite existing data.
SET 'execution.runtime-mode' = 'BATCH';
CREATE TABLE product_info_sink (
product_id BIGINT,
product_name STRING,
price DECIMAL(10, 2),
stock INT,
update_time TIMESTAMP(3),
PRIMARY KEY (product_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'ecommerce',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'productdb',
'table-name' = 'product_info',
-- Use the inc_replace mode (automatically overwrites existing data)
'load-method' = 'inc_replace', -- The dup-action parameter will be ignored
-- Performance configuration
'parallel' = '16',
'buffer-size' = '2048'
);
-- Import updated data (automatically overwrites old records)
INSERT INTO product_info_sink
SELECT * FROM product_info_latest;
Example 4: Read bounded data from Kafka and import
This example demonstrates how to read bounded data from Kafka (using the bounded mode) and import it to OceanBase.
SET 'execution.runtime-mode' = 'BATCH';
-- Create a Kafka source (bounded mode)
CREATE TABLE kafka_source (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3)
) WITH (
'connector' = 'kafka',
'topic' = 'user-events',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'flink-import-group',
'scan.startup.mode' = 'earliest-offset',
'scan.bounded.mode' = 'latest-offset', -- Bounded mode: stop after reading the latest offset
'format' = 'json'
);
-- Create an OceanBase sink
CREATE TABLE event_sink (
event_id BIGINT,
event_type STRING,
user_id BIGINT,
event_data STRING,
event_time TIMESTAMP(3),
PRIMARY KEY (event_id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '127.0.0.1',
'port' = '2882',
'tenant-name' = 'analytics',
'username' = 'root',
'password' = 'your_password',
'schema-name' = 'eventdb',
'table-name' = 'events',
'load-method' = 'full',
'parallel' = '16'
);
-- Import data
INSERT INTO event_sink
SELECT * FROM kafka_source;
Example 5: Import data from a file
Import data from a CSV file to OceanBase Database in batch mode.
SET 'execution.runtime-mode' = 'BATCH';
-- Create a source table
CREATE TABLE csv_source (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2)
) WITH (
'connector' = 'filesystem',
'path' = 'file:///data/employees.csv',
'format' = 'csv',
'csv.field-delimiter' = ',',
'csv.ignore-parse-errors' = 'false'
);
-- Create a sink table in OceanBase Database
CREATE TABLE employee_sink (
id BIGINT,
name STRING,
age INT,
salary DECIMAL(10, 2),
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'oceanbase-directload',
'host' = '192.168.1.100',
'port' = '2882',
'tenant-name' = 'hrms',
'username' = 'admin',
'password' = 'your_password',
'schema-name' = 'hr',
'table-name' = 'employees',
'load-method' = 'full',
'parallel' = '8',
'buffer-size' = '2048',
'max-error-rows' = '10' -- Allow up to 10 error rows
);
-- Execute the import
INSERT INTO employee_sink
SELECT * FROM csv_source;
Best practices for using the DirectLoad connector
Performance optimization
Set the Flink parallelism appropriately
The Flink parallelism determines the number of parallel Writer tasks. Setting it appropriately can fully utilize the cluster resources.
-- Set the parallelism based on the data volume and cluster resources.
SET 'parallelism.default' = '8';
Tune the parallel parameter (server-side parallelism)
The parallel parameter significantly affects the performance during the commit phase.
Tuning strategy:
- When importing a large amount of data, increasing the
parallelparameter can significantly reduce the commit time. - We recommend that you set the
parallelparameter to 2 to 4 times the number of CPU cores of the tenant. - You do not need to worry about setting a large value because the server automatically limits the value.
-- Example: 8-CPU tenant
'parallel' = '16' -- or '32'
Performance comparison
parallel |
Commit time |
|---|---|
| 8 (default) | About 10 minutes |
| 16 | About 5 minutes |
| 32 | About 3 minutes |
Tune the buffer-size
Adjust the buffer size based on the data characteristics:
-- Small rows (each row < 1 KB)
'buffer-size' = '4096'
-- Medium rows (each row 1–10 KB)
'buffer-size' = '1024' -- Default value
-- Large rows (each row > 10 KB)
'buffer-size' = '512'
Recommendations for production environments
Choose an appropriate import window
Since table locks are applied during direct load, we recommend that you:
- Choose off-peak hours for business operations, such as midnight or weekends.
- Notify the relevant parties in advance to avoid affecting other business operations.
- Set a reasonable timeout to prevent the import task from occupying the table for a long time.
-- Set a 2-hour timeout (based on the data volume).
'timeout' = '2h'
Choose the load-method based on the scenario
| Scenario | Recommended load-method |
Reason |
|---|---|---|
| Import data to an empty table for the first time | full |
Provides the best performance and query results for columnar storage. |
| Periodically append incremental data | inc |
Suitable for incremental scenarios and supports conflict checks. |
| Perform a full update of a dimension table daily | inc_replace |
Automatically overwrites old data without manual deletion. |
| Perform historical data rollback | full (import to a temporary table and then switch) |
Avoids affecting online tables. |
Recommendations for columnar storage tables
If the target is a columnar storage table, note the impact of different load-method values:
fullmode (recommended):- Data is directly written in columnar storage format.
- Query performance is optimal after the import.
- Suitable only for empty tables or tables that can be rebuilt.
inc/inc_replacemode:- Data is written in row-based storage format.
- Query performance is equivalent to that of row-based storage after the import.
- Columnar storage performance is achieved only after a major compaction.
- To achieve high query performance, we recommend that you manually trigger a major compaction:
-- Manually trigger a major compaction in OceanBase Database.
ALTER SYSTEM MAJOR FREEZE;
FAQs
Q1: What do I do if other write operations fail during import?
Problem description: During a direct load, an INSERT, UPDATE, or DELETE operation on the target table fails.
Cause: This is a characteristic of direct load. During import, the target table is locked and only SELECT operations are allowed.
Solution:
Solution 1: Schedule the import during off-peak hours
- Schedule the import task during off-peak hours (e.g., late at night).
- Coordinate with the business team to pause write operations in advance.
Solution 2: Use a temporary table
-- 1. Create a temporary table CREATE TABLE target_table_tmp LIKE target_table; -- Use DirectLoad to import data into target_table_tmp -- 2. Rename the tables RENAME TABLE target_table TO target_table_old, target_table_tmp TO target_table;- Import data into the temporary table first.
- After the import is complete, switch the table names or merge the data.
Solution 3: Use a regular JDBC connector
- If you cannot tolerate table locks, use the
flink-connector-oceanbaseconnector. - This sacrifices some performance for table availability.
- If you cannot tolerate table locks, use the
Q2: What do I do if the job does not end or the data is not committed?
Problem description: The Flink job remains in the RUNNING state, and no data is found in OceanBase.
Cause: The final commit of the DirectLoad connector occurs at the end-of-input stage. If the input is an unbounded stream, the job will not end, and the data will not be committed.
Troubleshooting steps:
Check if Batch mode is enabled:
SET 'execution.runtime-mode' = 'BATCH';Check if the data source is a bounded stream:
- File data source: Naturally bounded
- Kafka: Set
scan.bounded.mode - JDBC: Naturally bounded
- Custom Source: Ensure end-of-input signal is correctly sent
Check the Flink Web UI:
- Check the job status
- Look for Backpressure
- Check the progress of each operator
Q3: How do I choose the load-method?
Decision tree:
Is it the first import to an empty table?
├─ Yes → Use full mode (optimal performance, best columnar storage)
└─ No (importing to a non-empty table)
└─ Do you need to overwrite existing records?
├─ Yes → Use inc_replace mode
└─ No → Use inc mode
└─ Are primary key conflicts allowed?
├─ No → dup-action = STOP_ON_DUP
├─ Yes (keep new data) → dup-action = REPLACE
└─ Yes (keep old data) → dup-action = IGNORE
Special scenarios:
- For optimal query performance in a columnar table: Prefer
fullmode. - For OceanBase versions < 4.3.2: Only
fullmode is supported. - For frequent incremental imports: Use
incorinc_replacemode.
Q4: What do I do if queries are slow after using inc/inc_replace mode on a columnar table?
Problem description: After using inc or inc_replace mode to import data into a columnar table, the query performance is lower than expected.
Cause: Data written in inc/inc_replace mode is stored in row-based format. It needs to undergo a major compaction to be converted to columnar format.
Solution:
Manually trigger a major compaction (recommended):
-- Execute in OceanBase ALTER SYSTEM MAJOR FREEZE; -- Check the compaction progress SELECT * FROM oceanbase.DBA_OB_MAJOR_COMPACTION;Wait for automatic compaction: OceanBase automatically triggers compactions periodically, but this may take a long time depending on the configuration.
Q5: How do I optimize slow import speed?
Possible causes and solutions:
Low Flink parallelism:
SET 'parallelism.default' = '16'; -- Increase parallelismLow server-side parallelism:
'parallel' = '16' -- Increase server-side parallelismSmall buffer-size:
'buffer-size' = '2048' -- Increase buffer sizeInsufficient OceanBase resources:
- Check the load on the OceanBase cluster
- Consider scaling out or importing during off-peak hours