Starter class
connector-kafka.jar
com.oceanbase.oms.connector.kafka.sink.KafkaSinkBooster
[Metadata] servers
Description: specifies information about Kafka servers.
Default value: None. This parameter is required.
Valid values: strings.
Supported versions: OceanBase Migration Service (OMS) Community Edition V3.3.1 and later.
[Metadata] enableAsync
Description: specifies whether to enable asynchronous writing.
Default value:
true.Valid values: Boolean values.
Supported versions: OMS Community Edition V3.3.1 and later.
[Distribution] partition
Description: specifies the name of the partition into which data is to be written when
partitionModeis set toONE.Default value:
0.Valid values: integer values.
Supported versions: OMS Community Edition V3.3.1 and later.
[Distribution] partitionMode
Description: specifies the distribution logic that determines the partition into which a data record is to be written.
Default value:
HASH.Valid values: HASH (hash value of the primary key), ONE (fixed to one partition), and TABLE (hash value of the table name)
Supported versions: OMS Community Edition V3.3.1 and later.
Note: In the ONE mode,
workerNummust be set to1for serial execution. Otherwise, data may be concurrently written.
[Writing threads] workerNum
Description: specifies the number of data writing threads.
Default value:
4.Valid values: integer values.
Supported versions: OMS Community Edition V3.3.1 and later.
[Kafka parameters] batchSize
Description: specifies the batch size of the Kafka instance, which is the same as the buffer size, in bytes.
Default value:
16M.Valid values: integer values.
Supported versions: OMS Community Edition V3.3.1 and later.
[Kafka parameters] lingerMs
Description: specifies the time to wait before sending a batch in Kafka.
Default value:
100ms.Valid values: integer values.
Supported versions: OMS Community Edition V3.3.1 and later.
[Kafka parameters] properties
Description: specifies the transparent transmission parameters of Kafka.
Default value:
{ "max.request.size": "1024 * 1024 * 1024", // 1 GB "buffer.memory": "128 * 1024 * 1024", // 128 MB "retries": "1000", "max.block.ms": "30 * 60 * 1000", // 30 minutes "request.timeout.ms": "30 * 60 * 1000", // 30 minutes "acks": "1", "max.in.flight.requests.per.connection": "1", "compression.type": "lz4", "receive.buffer.bytes": "16 * 1024 * 1024", "send.buffer.bytes": "16 * 1024 * 1024" }Valid values: producer config parameters of Kafka.
Supported versions: OMS Community Edition V3.3.1 and later.
[Authentication] securityProtocol
Description: specifies the authentication protocol.
Default value: Null.
Valid values:
SASL_PLAINTEXT,SASL_SSL,PLAINTEXTSASL, andSSLSASL.Supported versions: OMS Community Edition V3.3.1 and later.
[Authentication] sslTruststoreLocation
Description: specifies the address of the SSL authentication files.
Default value: Null.
Valid values: paths.
Supported versions: OMS Community Edition V3.3.1 and later.
[Authentication] saslConfig
Description: specifies the SASL parameters.
Default value: None.
Valid values:
{ "mechanism": "The authentication mechanism. Valid values: GSSAPI, PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512.", "username": "The username and password are required for the PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 mechanisms.", "password": "The username and password are required for the PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 mechanisms.", "kerberosPrincipal": "", "kerberosKeyTabFileLocation": "The authentication configuration file, which is required for the GSSAPI mechanism.", "jaasConfFileLocation": "The authentication configuration file, which is required for the GSSAPI mechanism.", "kerberosConfFileLocation": "The address of the Kerberos configuration file, which is required for the GSSAPI mechanism." }Supported versions: OMS Community Edition V3.3.1 and later.
[Serialization] serializerType
Description: specifies the serialization format.
Default value:
DEFAULT.Valid values:
DEFAULT,DEFAULT_WITH_SCHEMA,SHAREPLEX,CANAL,DATAWORKS_V2, andDATAWORKS_V1.Supported versions: OMS Community Edition V3.3.1 and later.
isTopicMandatory
Description: specifies whether to enable forcible existence check for topics.
Default value:
true.Valid values:
trueandfalse.Supported versions: OMS Community Edition V4.1.1 and later.
consumerGroup
Description: specifies to check whether a topic has a consumer group.
Default value:
test.Valid values: strings.
Supported versions: OMS Community Edition V4.1.1 and later.
[Time zone conversion] dateTimezone
Description: specifies the time zone for the time specified by
dateTime.If the parameter is set to
+08:00, the time specified bydateTimeis in the UTC+8 time zone.Default value:
+00:00.Valid values:
+18:00to-18:00.Supported versions: OMS Community Edition V4.2.7 and later.
[Time zone conversion] dateConvertTimezone
Description: specifies the time zone for time conversion.
If the parameter is set to
+08:00, the time specified bydateTimeis converted to the UTC+8 time zone.Default value:
+00:00.Valid values:
+18:00to-18:00.Supported versions: OMS Community Edition V4.2.7 and later.
[Time zone conversion] dateConvertFormat
Description: specifies the time format.
If the parameter is set to
yyyy-MM-dd HH:mm:ss, the time values ofdatetimeandtimestampare output in the specified format.Default value: Null.
Valid values: strings.
Supported versions: OMS Community Edition V4.2.7 and later.
Here are some configuration examples of the time zone conversion parameters:
dateTimezone: '+08:00'
dateConvertTimezone: '+08:00'
| Time type | Database time | Time after synchronization (with dateConvertFormat unspecified) | Time after synchronization (with dateConvertFormat set to yyyy-MM-dd HH:mm:ss) |
|---|---|---|---|
| datetime | 2024-12-01 08:00:00 | 1733011200000 | 2024-12-01 08:00:00 |
| timestamp | 2024-12-01 08:00:00 | 2024-12-01T08:00:00+08:00 | 2024-12-01 08:00:00 |