This guide is for developers looking to build efficient data pipelines using Apache Flink and OceanBase Database. Whether you are new to Flink or an experienced user, this guide will help you:
- Quickly get up to speed with Flink’s core features.
- Gain a comprehensive understanding of the Flink connectors available for OceanBase Database.
- Select the best integration solution for your specific business needs.
Supported versions
- Flink V1.15 and above.
- The JDBC connector is compatible with OceanBase Database V3.x, V4.x, and above.
- The Flink Connector OceanBase Direct Load, OBKV-HBase Connector, and OBKV-HBase2 Connector are compatible with OceanBase Database V4.2.x and above.
Introduction to Flink
What is Apache Flink
Apache Flink is an open-source computing engine that efficiently processes both real-time data streams and batch data. In data integration scenarios, Flink offers powerful capabilities for synchronizing, transforming, and processing data across systems.
Why choose Flink SQL
Flink SQL is the simplest and most efficient way to use Flink. With SQL-like syntax, you can:
- Connect to a variety of data sources (such as Kafka, MySQL, and OceanBase Database).
- Perform filtering, aggregation, and transformation operations.
- Write results to your target system.
You can accomplish complex tasks with just a few lines of SQL, without needing to write Java or Scala code.
Real-time synchronization from Kafka to OceanBase Database
The following Flink SQL script can be executed directly in the Flink SQL Client or a Flink job to consume order data from Kafka and synchronize it in real time to OceanBase Database.
Note
You can use the CREATE TABLE statement to define the source and target, and then use a single INSERT INTO ... SELECT statement to synchronize data in real time with transformation.
Define the Kafka source table.
CREATE TABLE kafka_orders ( order_id BIGINT, user_id BIGINT, amount DECIMAL(10, 2), order_time TIMESTAMP(3) ) WITH ( 'connector' = 'kafka', 'topic' = 'orders', 'properties.bootstrap.servers' = 'localhost:9092', 'properties.group.id' = 'flink-consumer', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' );Define the OceanBase Database target table.
CREATE TABLE oceanbase_orders ( order_id BIGINT, user_id BIGINT, amount DECIMAL(10, 2), order_time TIMESTAMP(3), PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase', 'url' = 'jdbc:mysql://127.0.0.1:2881/test', 'username' = 'root@test', 'password' = 'password', 'table-name' = 'orders' );Execute data transformation and filtering.
INSERT INTO oceanbase_orders SELECT order_id, user_id, amount * 1.1 AS amount, -- You can perform data transformation. order_time FROM kafka_orders WHERE amount > 100; -- You can perform data filtering.
Core concepts
| Concept | Description |
|---|---|
| Source | The data source, such as Kafka or OceanBase CDC. |
| Sink | The data destination, such as OceanBase Database or Kafka. |
| Connector | The bridge that connects Flink to an external system. |
| SQL DDL | Use the CREATE TABLE statement to declare the data source or target. |
| SQL DML | Use the INSERT INTO ... SELECT statement to drive data flow. |
How to run Flink SQL
Method 1: SQL Client (interactive)
# Start the Flink SQL Client ./bin/sql-client.sh # Execute commands at the Flink SQL> prompt Flink SQL> CREATE TABLE ... Flink SQL> INSERT INTO ...Method 2: Submit an SQL file
# Save the SQL to a file (such as job.sql) and submit it ./bin/sql-client.sh -f job.sqlMethod 3: Web UI or programming API
Submit an SQL task through the Flink Web UI, or write a Java/Scala program using the Table API or DataStream API.
Overview and selection guide of OceanBase Flink Connector
In the previous example, the 'connector' = 'oceanbase' is used. In reality, OceanBase provides various specialized connectors for Flink, covering all scenarios, including read, write, and CDC.
Connector overview
Notice
- Connectors cannot be interchanged. Choose the appropriate one based on your specific scenario.
- OceanBase Database MySQL-compatible tenants are compatible with Flink MySQL Connector. For Oracle-compatible tenants, you must use Flink OceanBase Connector.
| Scenario | Recommended Connector | Core Features | Documentation Link |
|---|---|---|---|
| Real-time streaming write with moderate data volume | Flink Connector OceanBase | Based on JDBC, highly versatile | Flink Connector OceanBase Documentation |
| Lookup dimension table association | Flink Connector JDBC (Lookup Mode) | Standard JDBC | Flink Connector JDBC Documentation |
| Batch read of the entire table | Flink Connector JDBC (Batch Read, Single Parallelism) | Standard JDBC | Flink Connector JDBC Documentation |
CDC data synchronization
NoticeOnly supported for MySQL-compatible mode tenants in OceanBase Database. Oracle-compatible mode tenants are not supported. |
Flink CDC (OceanBase CDC) | Parallel full + incremental read | OceanBase CDC Documentation |
| Large-scale data import, TB-level batch data migration | Flink Connector OceanBase Direct Load | Based on direct load, high throughput | Flink Connector OceanBase Direct Load |
| High-performance write of fixed columns in KV format (simple) | Flink Connector OBKV HBase | Based on OBKV API, nested structure | Flink Connector OBKV HBase |
| High-performance KV write (advanced feature) | Flink Connector OBKV HBase2 | Flat structure, supports dynamic columns and partial updates | Flink Connector OBKV HBase2 |
Scenario details
Scenario 1: Real-time streaming write to OceanBase Database
Requirement:
Write real-time data from Kafka or Pulsar to OceanBase Database.
Solution:
Use the Flink Connector OceanBase. For more information, see Flink Connector OceanBase.
Advantages:
- Supports unbounded streams.
- Compatible with MySQL and Oracle modes.
- Supports batch writes and buffer optimization.
Example:
Create an OceanBase sink table.
CREATE TABLE orders_sink ( order_id BIGINT, user_id BIGINT, amount DECIMAL(10, 2), order_time TIMESTAMP(3), PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase', 'url' = 'jdbc:mysql://127.0.0.1:2881/test', 'username' = 'root@test', 'password' = 'password', 'table-name' = 'orders', 'buffer-flush.interval' = '1s', 'buffer-flush.buffer-size' = '1000' );Import data.
INSERT INTO orders_sink SELECT * FROM kafka_source;
Scenario 2: Large-scale data migration
Requirement:
Migrate historical data of TB scale to OceanBase Database.
Solution:
Use the Flink Connector OceanBase Direct Load. For more information, see Flink Connector OceanBase Direct Load.
Advantages:
- Based on direct load, it has a high throughput.
- Supports parallel writes from multiple nodes.
- Suitable for batch mode.
Considerations:
- Supports bounded streams but not real-time streams.
- During import, the target table is locked (read-only).
- We recommend that you use the Flink Batch mode to achieve better performance.
Example:
Create a Direct Load sink table.
CREATE TABLE large_table_sink ( id BIGINT, name STRING, data STRING, PRIMARY KEY (id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-directload', 'url' = 'jdbc:mysql://127.0.0.1:2881/test', 'username' = 'root@test', 'password' = 'password', 'schema-name' = 'test', 'table-name' = 'large_table', 'parallel' = '8' -- The parallelism. );Write data from the source table to the result table in batch mode.
INSERT INTO large_table_sink SELECT * FROM source_table;
Scenario 3: High-performance KV write (simple scenario)
Requirement:
Write KV data with high performance, where the column structure is simple and fixed.
Solution:
Use the Flink Connector OBKV HBase. For more information, see Flink Connector OBKV HBase.
Advantages:
- Based on the OBKV HBase API, it has excellent performance.
- Suitable for scenarios with a fixed column structure.
Limitations:
- The table definition must use a nested ROW structure.
- Does not support dynamic columns.
- Does not support partial column updates.
Example:
Create an HBase sink table.
CREATE TABLE hbase_sink (
rowkey STRING,
family1 ROW<column1 STRING, column2 STRING>, -- Nested ROW structure
PRIMARY KEY (rowkey) NOT ENFORCED
) WITH (
'connector' = 'obkv-hbase',
'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster',
'username' = 'root@test#obcluster',
'password' = 'password',
'sys.username' = 'root',
'sys.password' = 'password',
'schema-name' = 'test',
'table-name' = 'htable1'
);
Scenario 4: High-performance KV write (advanced features)
Requirement:
High-performance write and advanced features such as dynamic columns, partial column updates, and flexible timestamp control are required.
Solution:
Flink Connector OBKV HBase2. For more information, see Flink Connector OBKV HBase2.
Advantages:
- Flat table structure with concise definition.
- Supports dynamic column mode: column names can be dynamically specified at runtime.
- Supports partial column updates: only the columns to be updated need to be defined. Columns not defined will not be updated, which is very flexible.
- Supports timestamp control: different timestamps can be set for different columns (
tsColumn,tsMap). - Performance is comparable to OBKV HBase.
Example:
Create an HBase2 sink table.
Basic usage: flat structure.
CREATE TABLE hbase2_sink ( rowkey STRING, column1 STRING, -- Flat structure, no need for ROW nesting column2 STRING, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'obkv-hbase2', 'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster', 'username' = 'root@test#obcluster', 'password' = 'password', 'sys.username' = 'root', 'sys.password' = 'password', 'schema-name' = 'test', 'table-name' = 'htable1', 'columnFamily' = 'f' );Example of partial column updates.
Assume the HBase table in OceanBase Database has multiple columns, such as column1, column2, column3, and column4. You only need to define the columns to be updated in the Flink table.
CREATE TABLE partial_update_sink ( rowkey STRING, column1 STRING, -- Define only the columns to be updated column2 STRING, -- Columns not defined here (such as column3 and column4) will not be updated PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'obkv-hbase2', 'url' = 'http://127.0.0.1:8080/services?Action=ObRootServiceInfo&ObCluster=obcluster', 'username' = 'root@test#obcluster', 'password' = 'password', 'sys.username' = 'root', 'sys.password' = 'password', 'schema-name' = 'test', 'table-name' = 'htable1', 'columnFamily' = 'f' );
Write data and update only column1 and column2, while keeping other columns (column3, column4, etc.) unchanged.
INSERT INTO partial_update_sink VALUES ('1', 'new_value1', 'new_value2');
Scenario 5: CDC data synchronization (MySQL-compatible tenant)
Requirement:
Real-time capture of data changes in OceanBase Database and achieve full and incremental synchronization.
Solution:
Flink CDC (OceanBase CDC). For more information, see OceanBase CDC official documentation.
Advantages:
- Parallel full read (performance far exceeds JDBC).
- Incremental synchronization based on binlog.
- Unified full and incremental process.
Limitations:
Only supports MySQL-compatible tenants.
Example:
Create an OceanBase CDC source table.
CREATE TABLE orders_cdc ( order_id BIGINT, user_id BIGINT, amount DECIMAL(10, 2), order_time TIMESTAMP(3), PRIMARY KEY (order_id) NOT ENFORCED ) WITH ( 'connector' = 'oceanbase-cdc', 'hostname' = '127.0.0.1', 'port' = '2881', 'username' = 'root@test', 'password' = 'password', 'database-name' = 'mydb', 'table-name' = 'orders', 'scan.startup.mode' = 'initial' -- Full + incremental );Read and process CDC data.
SELECT * FROM orders_cdc;
Scenario 6: Lookup join with a dimension table
Requirement:
Complete the user dimension information in the stream processing.
Solution:
Flink JDBC Connector (as a lookup source). For more information, see Flink JDBC Connector official documentation.
Features:
- Supports lookup join
- Supports cache optimization
Notice
Full table scans are single-parallelism operations, but lookup operations typically involve point queries and are not subject to this limitation.
Here is an example:
Create a JDBC lookup table.
CREATE TABLE dim_user ( user_id BIGINT, user_name STRING, city STRING, PRIMARY KEY (user_id) NOT ENFORCED ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://127.0.0.1:2881/test', 'username' = 'root@test', 'password' = 'password', 'table-name' = 'dim_user', 'lookup.cache.max-rows' = '10000', -- Cache configuration 'lookup.cache.ttl' = '1 hour' );Join the stream table with the dimension table.
SELECT o.order_id, o.user_id, u.user_name, -- Complete the information from the dimension table u.city, o.amount FROM orders_stream o LEFT JOIN dim_user FOR SYSTEM_TIME AS OF o.proc_time AS u ON o.user_id = u.user_id;
Deep comparison
The following sections compare several connectors that are easy to confuse.
OBKV HBase and OBKV HBase2
| Feature | OBKV HBase | OBKV HBase2 |
|---|---|---|
| Data model | Nested ROW structure | Flat model |
| Table definition complexity | Complex (requires ROW<...> nesting) |
Simple (direct column definition) |
| Column family support | A table can have multiple column families | A table supports only one column family (multiple column families require separate tables, which is cumbersome) |
| Dynamic column support | Not supported | Supported |
| Partial column update | Not supported | Supported (only define the columns to be updated) |
| Timestamp control | Not supported | Supported (using tsColumn + tsMap) |
| Flexibility | Low | High |
| Performance | High | High |
| Learning curve | Low (if familiar with HBase) | Moderate (requires understanding of new features) |
| Use cases | Scenarios requiring multiple column families, simple fixed columns | Single-column-family scenarios requiring dynamic columns, partial updates, and timestamp control |
Example of table definition comparison:
OBKV HBase: Nested ROW structure, supports multiple column families in a single table.
CREATE TABLE hbase_sink ( rowkey STRING, family1 ROW<column1 STRING, column2 STRING>, -- Column family 1 family2 ROW<column3 STRING, column4 STRING>, -- Column family 2 (supports multiple column families) PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'obkv-hbase', ... );You can write to multiple column families simultaneously during write operations.
INSERT INTO hbase_sink VALUES ('row1', ROW('val1', 'val2'), ROW('val3', 'val4'));OBKV HBase2: Flat structure, supports only one column family per table.
If you need to write to multiple column families, you must create separate tables for each column family (which is cumbersome).
Table for column family family1:
CREATE TABLE hbase2_family1_sink ( rowkey STRING, column1 STRING, column2 STRING, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'obkv-hbase2', 'columnFamily' = 'family1', -- Only one column family can be specified ... );Table for column family family2 (needs to be created separately)
CREATE TABLE hbase2_family2_sink ( rowkey STRING, column3 STRING, column4 STRING, PRIMARY KEY (rowkey) NOT ENFORCED ) WITH ( 'connector' = 'obkv-hbase2', 'columnFamily' = 'family2', -- Different column families ... );
During write operations, you need to write to both tables separately.
Write to the table for column family family1:
INSERT INTO hbase2_family1_sink VALUES ('row1', 'val1', 'val2');Write to the table for column family family2:
INSERT INTO hbase2_family2_sink VALUES ('row1', 'val3', 'val4');
Note
Here are some recommendations for selection:
- If you need multiple column families: strongly recommend OBKV HBase.
- HBase: A single table can handle multiple column families, which is simple and convenient.
- HBase2: You need to create separate tables for each column family and write to them separately, which is cumbersome.
- If you need advanced features such as dynamic columns, partial updates, and timestamp control: you must choose OBKV HBase2.
- If you have a simple scenario (fixed columns, single column family): both options are suitable, but HBase2 has a simpler table definition, so it is recommended to use it first.
- For new projects: recommend OBKV HBase2 for single-column-family scenarios and OBKV HBase for multi-column-family scenarios.
OceanBase CDC vs. JDBC
| Feature | OceanBase CDC | JDBC Connector |
|---|---|---|
| Full-table scan parallelism | Supports parallel reads (multiple parallelism levels) | Single parallelism level (slow) |
| Incremental read | Supports binlog-based incremental reads | Not supported |
| Lookup join | Not supported | Supported (main use case) |
| Batch full-table scan | Supported (parallel, fast) | Supported (single-threaded, slow) |
| Tenant support | Only MySQL-compatible tenants | MySQL and Oracle-compatible tenants |
| Main use case | CDC full + incremental synchronization | Lookup join for dimension tables |
| Data real-time | Near real-time (binlog delay) | Real-time at query time |
Note
Here are some recommendations for selection:
- CDC scenarios (full + incremental synchronization): recommend OceanBase CDC (only for MySQL-compatible tenants)
- Lookup join scenarios (dimension table join): recommend JDBC Connector.
- Batch full-table scan: recommend OceanBase CDC (MySQL-compatible tenant) for performance; for Oracle-compatible tenants, JDBC Connector is the only option, but it is slower.
Comparison of Sink writing
| Feature | JDBC | Direct Load | OBKV HBase | OBKV HBase2 |
|---|---|---|---|---|
| Data stream type | Unbounded/bounded | Only bounded | Unbounded/bounded | Unbounded/bounded |
| Throughput | Moderate | Extremely high | High | High |
| Latency | Low | High (batch) | Low | Low |
| Table locking | None | Table locked during import | None | None |
| Compatibility mode | MySQL/Oracle | MySQL/Oracle | MySQL | MySQL |
| Complexity | Simple | Simple | Moderate | Moderate |
| Typical scenarios | Real-time writing | Batch import | High-performance KV writing | High-performance KV writing + advanced features |
FAQ
Question 1: What are the differences between the Direct Load Sink and the JDBC Sink? How do I choose between them?
Main differences:
- JDBC Sink: Based on the standard JDBC protocol, it is suitable for real-time streaming writes and supports unbounded streams without requiring table locks.
- Direct Load Sink: Based on the bypass import API, it is suitable for bulk data imports with extremely high throughput, but it only supports bounded streams and requires table locks during the import.
Recommendation:
- For real-time streaming writes (e.g., Kafka to OceanBase Database): Use the JDBC Sink.
- For bulk historical data imports (e.g., data migration): Use the Direct Load Sink.
Question 2: What are the differences between OBKV HBase and OBKV HBase2? How do I choose between them?
Main differences:
Table definition: HBase uses a nested ROW structure, while HBase2 uses a flat structure (which is more concise).
Column family support:
- HBase: A Flink table can write to multiple column families, which is very convenient.
- HBase2: A Flink table can only specify one column family. If you need to write to multiple column families, you need to create separate tables for each column family and write to them individually, which can be cumbersome.
Advanced features: HBase2 supports dynamic columns, partial column updates, and timestamp control, while HBase does not.
Use cases: HBase is suitable for scenarios requiring multiple column families, while HBase2 is suitable for scenarios requiring advanced features with a single column family.
Recommendation:
For scenarios requiring multiple column families: Strongly recommend OBKV HBase (HBase2 requires creating a table for each column family, which is inconvenient).
For scenarios requiring dynamic columns, partial column updates, and timestamp control: Recommend OBKV HBase2.
For simple, fixed-column, single-column-family scenarios: Both are applicable, but OBKV HBase2 is recommended (it has a simpler table definition).
For new projects:
- For single-column-family use cases: Use OBKV HBase2.
- For multi-column-family use cases: Use OBKV HBase.
Question 3: What are the typical use cases for the JDBC Source? How does it differ from CDC?
The JDBC Source has two main use cases:
Lookup Join (dimension table association) - Recommended use case
- In stream computing, query dimension tables based on primary keys in real time.
- Supports cache optimization, ensuring high performance.
- This is the primary use case for the JDBC Source.
Batch reading of full tables
- Can read full table data from OceanBase Database.
- Single parallelism, which results in slower performance and is not recommended for large tables.
- Suitable for batch reading of small amounts of data.
Comparison between JDBC Source and CDC:
- OceanBase CDC: Supports parallel full-table reads and binlog incremental reads, offering high performance and is ideal for CDC scenarios (only applicable to MySQL-compatible tenants).
- JDBC Source: Primarily used for Lookup Joins, but can also perform batch reads with lower performance (single parallelism).
Recommendation:
- For Lookup dimension table association: Use the JDBC Source (mandatory).
- For CDC full-table + incremental reads (MySQL-compatible tenant): Use OceanBase CDC (recommended).
- For batch reading of full tables: Prefer OceanBase CDC. If not supported (e.g., Oracle-compatible tenant), use the JDBC Source.
Question 4: Why does OceanBase CDC only support MySQL-compatible tenants? What about Oracle-compatible tenants?
OceanBase CDC supports the binlog service compatible with MySQL. For Oracle-compatible tenants, use the OMS tool to implement CDC incremental reads.
Question 5: How do I choose the appropriate batch size and parallelism?
Batch size (Buffer Size):
- Small batches (100-500): Suitable for low-latency scenarios, where data is written as soon as possible.
- Medium batches (1000-5000): Balances latency and throughput. Recommended default value.
- Large batches (5000+): Suitable for high-throughput scenarios, but latency increases.
Parallelism:
- Generally set to the number of CPU cores or a multiple of it.
- In bypass import scenarios, you can set a higher parallelism (e.g., 8-16) to fully utilize the bypass import capability based on tenant resources.
- Consider the load capacity of the OceanBase cluster.
Question 6: Can I use different connectors together?
Yes. You can use multiple different connectors in a single Flink task.
Common combinations:
- Flink CDC (MySQL) Source + OceanBase JDBC Sink: Real-time synchronization from MySQL to OceanBase Database.
- OceanBase CDC Source + Kafka Sink: Data synchronization from OceanBase Database to a Kafka message queue.
- Kafka Source + JDBC Lookup + OceanBase Sink: Data from Kafka is associated with dimension tables and then written to OceanBase Database.