With Flink Connector provided by OBKV-HBase, you can write data to OBKV-HBase by using the OBKV-HBase Java client obkv-hbase-client-java in Flink.
Get started
You can obtain an official release of Flink Connector from the release page or Maven central repository.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
</dependency>
You can obtain the latest snapshot release of Flink Connector by configuring the Maven snapshot repository.
<dependency>
<groupId>com.oceanbase</groupId>
<artifactId>flink-connector-obkv-hbase</artifactId>
<version>${project.version}</version>
</dependency>
<repositories>
<repository>
<id>sonatype-snapshots</id>
<name>Sonatype Snapshot Repository</name>
<url>https://s01.oss.sonatype.org/content/repositories/snapshots/</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
You can also download and compile the source code of Flink Connector to obtain its package.
git clone https://github.com/oceanbase/flink-connector-oceanbase.git
cd flink-connector-oceanbase
mvn clean package -DskipTests
Download the SQL JAR package
To use Flink Connector in the Flink SQL client, you need to download the flink-sql-connector-obkv-hbase-${project.version}.jar package that contains all dependencies.
Examples
Create a table named htable1$family1 in OceanBase Database. In the table name, htable is the table name in HBase, and family1 is the column family in HBase.
use test;
CREATE TABLE `htable1$family1`
(
`K` varbinary(1024) NOT NULL,
`Q` varbinary(256) NOT NULL,
`T` bigint(20) NOT NULL,
`V` varbinary(1048576) NOT NULL,
PRIMARY KEY (`K`, `Q`, `T`)
)
Example for a Java application
Take a Maven project as an example. Add the required dependencies to the pom.xml file and run the following code:
package com.oceanbase;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class Main {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env, EnvironmentSettings.newInstance().inStreamingMode().build());
tEnv.executeSql(
"CREATE TABLE t_sink ( "
+ " rowkey STRING,"
+ " family1 ROW<column1 STRING, column2 STRING>,"
+ " PRIMARY KEY (rowkey) NOT ENFORCED"
+ ") with ("
+ " 'connector'='obkv-hbase',"
+ " 'url'='http://127.0.0.1:8080/services?...',"
+ " 'schema-name'='test',"
+ " 'table-name'='htable1',"
+ " 'username'='root@test',"
+ " 'password'='',"
+ " 'sys.username'='root',"
+ " 'sys.password'=''"
+ ");");
tEnv.executeSql(
"INSERT INTO t_sink VALUES "
+ "('1', ROW('r1f1c1', 'r1f1c2')),"
+ "('2', ROW('r2f1c1', 'r2f1c2')),"
+ "('2', ROW('r3f1c1', 'r3f1c2'));")
.await();
}
}
After the code is executed, you can query the data in OceanBase Database.
For more information, see OBKVHBaseConnectorITCase.java.
Example for the Flink SQL client
Place the JAR files of required dependencies in the lib directory of Flink, and then create the target table in Flink by using the SQL client.
CREATE TABLE t_sink
(
rowkey STRING,
family1 ROW <column1 STRING,
column2 STRING >,
PRIMARY KEY (rowkey) NOT ENFORCED
) with (
'connector'='obkv-hbase',
'url'='http://127.0.0.1:8080/services?...',
'schema-name'='test',
'table-name'='htable1',
'username'='root@test',
'password'='',
'sys.username'='root',
'sys.password'='');
Insert the test data.
INSERT INTO t_sink
VALUES ('1', ROW ('r1f1c1', 'r1f1c2')),
('2', ROW ('r2f1c1', 'r2f1c2')),
('2', ROW ('r3f1c1', 'r3f1c2'));
After the insertion, you can query the data in OceanBase Database.
Parameters
| Parameter | Required | Default value | Type | Description |
|---|---|---|---|---|
| url | Yes | String | The ConfigUrl of the cluster. You can execute the SHOW PARAMETERS LIKE 'obconfig_url' statement to query the ConfigUrl. |
|
| schema-name | Yes | String | The name of the OceanBase database. | |
| table-name | Yes | String | The table name in HBase. In OceanBase Database, the table name is in the hbase_table_name$family_name format. |
|
| username | Yes | String | The username of a user in a user tenant. | |
| password | Yes | String | The password of a user in a user tenant. | |
| sys.username | Yes | String | The username of a user in the sys tenant. | |
| sys.password | Yes | String | The password of a user in the sys tenant. | |
| hbase.properties | No | String | The properties of 'obkv-hbase-client-java', which are separated with semicolons (;). |
|
| sync-write | No | false | Boolean | Specifies whether to enable synchronous write. When the value is true, data is directly written to the database without using the buffer. |
| buffer-flush.interval | No | 1s | Duration | The buffer refresh interval. When the value is 0, the buffer is not periodically refreshed. |
| buffer-flush.buffer-size | No | 1000 | Integer | The buffer size. |
| max-retries | No | 3 | Integer | The number of retries when a failure occurs. |