KafkaSink

2025-01-10 09:41:08  Updated

Starter class

connector-kafka.jar

com.oceanbase.oms.connector.kafka.sink.KafkaSinkBooster

[Metadata] servers

  • Description: specifies information about Kafka servers.

  • Default value: None. This parameter is required.

  • Valid values: strings.

  • Supported versions: OceanBase Migration Service (OMS) Community Edition V3.3.1 and later.

[Metadata] enableAsync

  • Description: specifies whether to enable asynchronous writing.

  • Default value: true.

  • Valid values: Boolean values.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Distribution] partition

  • Description: specifies the name of the partition into which data is to be written when partitionMode is set to ONE.

  • Default value: 0.

  • Valid values: integer values.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Distribution] partitionMode

  • Description: specifies the distribution logic that determines the partition into which a data record is to be written.

  • Default value: HASH.

  • Valid values: HASH (hash value of the primary key), ONE (fixed to one partition), and TABLE (hash value of the table name)

  • Supported versions: OMS Community Edition V3.3.1 and later.

  • Note: In the ONE mode, workerNum must be set to 1 for serial execution. Otherwise, data may be concurrently written.

[Writing threads] workerNum

  • Description: specifies the number of data writing threads.

  • Default value: 4.

  • Valid values: integer values.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Kafka parameters] batchSize

  • Description: specifies the batch size of the Kafka instance, which is the same as the buffer size, in bytes.

  • Default value: 16M.

  • Valid values: integer values.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Kafka parameters] lingerMs

  • Description: specifies the time to wait before sending a batch in Kafka.

  • Default value: 100ms.

  • Valid values: integer values.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Kafka parameters] properties

  • Description: specifies the transparent transmission parameters of Kafka.

  • Default value:

      {
        "max.request.size": "1024 * 1024 * 1024", // 1 GB
        "buffer.memory": "128 * 1024 * 1024", // 128 MB
        "retries": "1000",
        "max.block.ms": "30 * 60 * 1000", // 30 minutes
        "request.timeout.ms": "30 * 60 * 1000", // 30 minutes
        "acks": "1",
        "max.in.flight.requests.per.connection": "1",
        "compression.type": "lz4",
        "receive.buffer.bytes": "16 * 1024 * 1024",
        "send.buffer.bytes": "16 * 1024 * 1024"
      }
    
  • Valid values: producer config parameters of Kafka.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Authentication] securityProtocol

  • Description: specifies the authentication protocol.

  • Default value: Null.

  • Valid values: SASL_PLAINTEXT, SASL_SSL, PLAINTEXTSASL, and SSLSASL.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Authentication] sslTruststoreLocation

  • Description: specifies the address of the SSL authentication files.

  • Default value: Null.

  • Valid values: paths.

  • Supported versions: OMS Community Edition V3.3.1 and later.

[Authentication] saslConfig

  • Description: specifies the SASL parameters.

  • Default value: None.

  • Valid values:

      {
        "mechanism": "The authentication mechanism. Valid values: GSSAPI, PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512.",
        "username": "The username and password are required for the PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 mechanisms.",
        "password": "The username and password are required for the PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 mechanisms.",
        "kerberosPrincipal": "",
        "kerberosKeyTabFileLocation": "The authentication configuration file, which is required for the GSSAPI mechanism.",
        "jaasConfFileLocation": "The authentication configuration file, which is required for the GSSAPI mechanism.",
        "kerberosConfFileLocation": "The address of the Kerberos configuration file, which is required for the GSSAPI mechanism."
      }
    
  • Supported versions: OMS Community Edition V3.3.1 and later.

[Serialization] serializerType

  • Description: specifies the serialization format.

  • Default value: DEFAULT.

  • Valid values: DEFAULT, DEFAULT_WITH_SCHEMA, SHAREPLEX, CANAL, DATAWORKS_V2, and DATAWORKS_V1.

  • Supported versions: OMS Community Edition V3.3.1 and later.

isTopicMandatory

  • Description: specifies whether to enable forcible existence check for topics.

  • Default value: true.

  • Valid values: true and false.

  • Supported versions: OMS Community Edition V4.1.1 and later.

consumerGroup

  • Description: specifies to check whether a topic has a consumer group.

  • Default value: test.

  • Valid values: strings.

  • Supported versions: OMS Community Edition V4.1.1 and later.

[Time zone conversion] dateTimezone

  • Description: specifies the time zone for the time specified by dateTime.

    If the parameter is set to +08:00, the time specified by dateTime is in the UTC+8 time zone.

  • Default value: +00:00.

  • Valid values: +18:00 to -18:00.

  • Supported versions: OMS Community Edition V4.2.7 and later.

[Time zone conversion] dateConvertTimezone

  • Description: specifies the time zone for time conversion.

    If the parameter is set to +08:00, the time specified by dateTime is converted to the UTC+8 time zone.

  • Default value: +00:00.

  • Valid values: +18:00 to -18:00.

  • Supported versions: OMS Community Edition V4.2.7 and later.

[Time zone conversion] dateConvertFormat

  • Description: specifies the time format.

    If the parameter is set to yyyy-MM-dd HH:mm:ss, the time values of datetime and timestamp are output in the specified format.

  • Default value: Null.

  • Valid values: strings.

  • Supported versions: OMS Community Edition V4.2.7 and later.

Here are some configuration examples of the time zone conversion parameters:

dateTimezone: '+08:00'

dateConvertTimezone: '+08:00'

Time type Database time Time after synchronization (with dateConvertFormat unspecified) Time after synchronization (with dateConvertFormat set to yyyy-MM-dd HH:mm:ss)
datetime 2024-12-01 08:00:00 1733011200000 2024-12-01 08:00:00
timestamp 2024-12-01 08:00:00 2024-12-01T08:00:00+08:00 2024-12-01 08:00:00

Contact Us