OceanBase logo

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Resources

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS

OceanBase Cloud

OceanBase Database

Tools

Connectors and Middleware

QUICK START

OceanBase Cloud

OceanBase Database

BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Company

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

International - English
中国站 - 简体中文
日本 - 日本語
Sign In
Start on Cloud

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS
OceanBase CloudOceanBase Database
ToolsConnectors and Middleware
QUICK START
OceanBase CloudOceanBase Database
BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

Start on Cloud
编组
All Products
    • Databases
    • iconOceanBase Database
    • iconOceanBase Cloud
    • iconOceanBase Tugraph
    • iconInteractive Tutorials
    • iconOceanBase Best Practices
    • Tools
    • iconOceanBase Cloud Platform
    • iconOceanBase Migration Service
    • iconOceanBase Developer Center
    • iconOceanBase Migration Assessment
    • iconOceanBase Admin Tool
    • iconOceanBase Loader and Dumper
    • iconOceanBase Deployer
    • iconKubernetes operator for OceanBase
    • iconOceanBase Diagnostic Tool
    • iconOceanBase Binlog Service
    • Connectors and Middleware
    • iconOceanBase Database Proxy
    • iconEmbedded SQL in C for OceanBase
    • iconOceanBase Call Interface
    • iconOceanBase Connector/C
    • iconOceanBase Connector/J
    • iconOceanBase Connector/ODBC
    • iconOceanBase Connector/NET
icon

OceanBase Migration Service

V4.2.3Community Edition

  • OMS Documentation
  • OMS Community Edition Introduction
    • What is OMS Community Edition?
    • Terms
    • OMS Community Edition HA
    • Architecture
      • Overview
      • Hierarchical functional system
      • Basic components
    • Limitations
  • Quick Start
    • Data migration process
    • Data synchronization process
  • Deploy OMS Community Edition
    • Deployment modes
    • System and network requirements
    • Memory and disk requirements
    • Prepare the environment
    • Deploy OMS Community Edition on a single node
    • Deploy OMS Community Edition on multiple nodes in a single region
    • Deploy OMS Community Edition on multiple nodes in multiple regions
    • Integrate the OIDC protocol into OMS Community Edition to implement SSO
    • Scale out OMS Community Edition
    • Check the deployment
    • Deploy a time-series database (Optional)
  • OMS Community Edition console
    • Log on to the console of OMS Community Edition
    • Overview
    • User center
      • Configure user information
      • Change your logon password
      • Log off
  • Data migration
    • Overview
    • Migrate data from a MySQL database to OceanBase Database Community Edition
    • Migrate data from OceanBase Database Community Edition to a MySQL database
    • Migrate data from HBase to OBKV
    • Migrate data between instances of OceanBase Database Community Edition
    • Migrate data in active-active disaster recovery scenarios
    • Migrate data from a TiDB database to OceanBase Database Community Edition
    • Migrate data from a PostgreSQL database to OceanBase Database Community Edition
    • Manage data migration projects
      • View the details of a data migration project
      • Change the name of a data migration project
      • View and modify migration objects
      • Manage computing platforms
      • Use tags to manage data migration projects
      • Perform batch operations on data migration projects
      • Download and import settings of migration objects
      • Start and pause a data migration project
      • Release and delete a data migration project
    • Features
      • DML filtering
      • DDL synchronization
      • Configure matching rules for migration objects
      • Wildcard rules
      • Rename a database table
      • Use SQL conditions to filter data
      • Create and update a heartbeat table
      • Schema migration mechanisms
      • Schema migration operations
      • Set an incremental synchronization timestamp
    • Supported DDL operations in incremental migration and limits
      • DDL synchronization from MySQL database to OceanBase Community Edition
        • Overview of DDL synchronization from a MySQL database to a MySQL tenant of OceanBase Database
        • CREATE TABLE
          • Create a table
          • Create a column
          • Create an index or a constraint
          • Create partitions
        • Data type conversion
        • ALTER TABLE
          • Modify a table
          • Operations on columns
          • Operations on constraints and indexes
          • Operations on partitions
        • TRUNCATE TABLE
        • RENAME TABLE
        • DROP TABLE
        • CREATE INDEX
        • DROP INDEX
        • DDL incompatibilities between MySQL database and OceanBase Community Edition
          • Overview
          • Incompatibilities of the CREATE TABLE statement
            • Incompatibilities of CREATE TABLE
            • Column types that are supported to create indexes or constraints
          • Incompatibilities of the ALTER TABLE statement
            • Incompatibilities of ALTER TABLE
            • Change the type of a constrained column
            • Change the type of an unconstrained column
            • Change the length of a constrained column
            • Change the length of an unconstrained column
            • Delete a constrained column
          • Incompatibilities of DROP INDEX operations
      • Supported DDL operations in incremental migration from OceanBase Community Edition to a MySQL database and limits
      • Supported DDL operations in incremental migration between MySQL tenants of OceanBase Database
  • Data synchronization
    • Data synchronization overview
    • Create a project to synchronize data from OceanBase Database Community Edition to a Kafka instance
    • Create a project to synchronize data from OceanBase Database Community Edition to a RocketMQ instance
    • Manage data synchronization projects
      • View details of a data synchronization project
      • Change the name of a data synchronization project
      • View and modify synchronization objects
      • Use tags to manage data synchronization projects
      • Perform batch operations on data synchronization projects
      • Download and import the settings of synchronization objects
      • Start and pause a data synchronization project
      • Release and delete a data synchronization project
    • Features
      • DML filtering
      • DDL synchronization
      • Rename a topic
      • Use SQL conditions to filter data
      • Column filtering
      • Data formats
  • Create and manage data sources
    • Create data sources
      • Create an OceanBase-CE data source
      • Create a MySQL data source
      • Create a TiDB data source
      • Create a Kafka data source
      • Create a RocketMQ data source
      • Create a PostgreSQL data source
      • Create an HBase data source
    • Manage data sources
      • View data source information
      • Copy a data source
      • Edit a data source
      • Delete a data source
    • Create a database user
    • User privileges
    • Enable binlogs for the MySQL database
  • OPS & Monitoring
    • O&M overview
    • Go to the overview page
    • Server
      • View server information
      • Update the quota
      • View server logs
    • Components
      • Store
        • Create a store
        • View details of a store
        • Update the configurations of a store
        • Start and pause a store
        • Delete a store
      • Incr-Sync
        • View details of an Incr-Sync component
        • Start and pause an Incr-Sync component
        • Migrate an Incr-Sync component
        • Update the configurations of an Incr-Sync component
        • Batch O&M
        • Delete an Incr-Sync component
      • Full-Import
        • View details of a Full-Import component
        • Pause a Full-Import component
        • Rerun and resume a Full-Import component
        • Update the configurations of a Full-Import component
        • Delete a Full-Import component
      • Full-Verification
        • View details of a Full-Verification component
        • Pause a Full-Verification component
        • Rerun and resume a Full-Verification component
        • Update the configurations of a Full-Verification component
        • Delete a Full-Verification component
    • O&M Task
      • View O&M tasks
      • Skip a task or subtask
      • Retry a task or subtask
  • System management
    • Permission Management
      • Overview
      • Manage users
      • Manage departments
    • Alert center
      • View project alerts
      • View system alerts
      • Manage alert settings
    • Associate with OCP
    • System parameters
      • Modify system parameters
      • Modify HA configurations
  • OMS Community Edition O&M
    • Manage OMS services
    • OMS logs
    • Component O&M
      • O&M operations for the Supervisor component
      • CLI-based O&M for the Connector component
      • O&M operations for the Store component
    • Component tuning
      • Incr-Sync or Full-Import tuning
    • Component parameters
      • Coordinator
      • Condition
      • Source Plugin
        • Overview
        • StoreSource
        • DataFlowSource
        • LogProxySource
        • KafkaSource (TiDB)
        • HBaseSource
      • Sink Plugin
        • Overview
        • JDBC-Sink
        • KafkaSink
        • DatahubSink
        • RocketMQSink
        • HBaseSink
      • Store parameters
        • Parameters of a MySQL store
        • Parameters of an OceanBase store
      • Parameters of the CM component
      • Parameters of the Supervisor component
      • Full-Verification parameters
    • Set throttling
  • Reference Guide
    • API Reference
      • Overview
      • CreateProject
      • StartProject
      • StopProject
      • ResumeProject
      • ReleaseProject
      • DeleteProject
      • ListProjects
      • DescribeProject
      • DescribeProjectSteps
      • DescribeProjectStepMetric
      • DescribeProjectProgress
      • DescribeProjectComponents
      • ListProjectFullVerifyResult
      • StartProjectsByLabel
      • StopProjectsByLabel
      • CreateMysqlDataSource
      • CreateOceanBaseDataSource
      • ListDataSource
      • CreateLabel
      • ListAllLabels
      • ListFullVerifyInconsistenciesResult
      • ListFullVerifyCorrectionsResult
      • UpdateStore
      • UpdateFullImport
      • UpdateIncrSync
      • UpdateFullVerification
    • OMS error codes
    • Alert Reference
      • oms_host_down
      • oms_host_down_migrate_resource
      • oms_host_threshold
      • oms_migration_failed
      • oms_migration_delay
      • oms_sync_failed
      • oms_sync_status_inconsistent
      • oms_sync_delay
    • Telemetry parameters
  • Upgrade Guide
    • Overview
    • Upgrade OMS Community Edition in single-node deployment mode
    • Upgrade OMS Community Edition in multi-node deployment mode
    • FAQ
  • FAQ
    • General O&M
      • How do I modify the resource quotas of an OMS container?
      • Clear files in the Store component
      • How do I troubleshoot the OMS server down issue?
      • Deploy InfluxDB for OMS
      • Increase the disk space of the OMS host
    • Project diagnostics
      • What do I do when a store does not have data of the timestamp requested by the downstream?
      • What do I do when OceanBase Store failed to access an OceanBase cluster through RPC?
    • OPS & monitoring
      • What are the alert rules?
    • Data synchronization
      • FAQ about synchronization to a message queue
        • What are the strategies for ensuring the message order in incremental data synchronization to Kafka
    • Data migration
      • Full migration
        • How do I query the ID of a checker?
        • How do I query log files of the Checker component of OMS?
        • How do I query the verification result files of the Checker component of OMS?
        • What do I do if the destination table does not exist?
        • What can I do when the full migration failed due to LOB fields?
        • What do I do if garbled characters cannot be written into OceanBase Database V3.1.2?
      • Incremental synchronization
        • How do I skip DDL statements?
        • How do I update whitelists and blacklists?
        • What are the application scope and limits of ETL?
    • Installation and deployment
      • How do I upgrade Store?
      • How do I upgrade CDC?
      • What do I do when the "Failed to fetch" error is reported?
      • Change port numbers for components
      • Switch to the standby database

Download PDF

OMS Documentation What is OMS Community Edition? Terms OMS Community Edition HA Overview Hierarchical functional system Basic components Limitations Data migration process Data synchronization process Deployment modes System and network requirements Memory and disk requirements Prepare the environment Deploy OMS Community Edition on a single node Deploy OMS Community Edition on multiple nodes in a single region Deploy OMS Community Edition on multiple nodes in multiple regions Integrate the OIDC protocol into OMS Community Edition to implement SSO Scale out OMS Community Edition Check the deployment Deploy a time-series database (Optional) Log on to the console of OMS Community Edition Overview Configure user information Change your logon password Log off Overview Migrate data from a MySQL database to OceanBase Database Community Edition Migrate data from OceanBase Database Community Edition to a MySQL database Migrate data from HBase to OBKV Migrate data between instances of OceanBase Database Community Edition Migrate data in active-active disaster recovery scenarios Migrate data from a TiDB database to OceanBase Database Community Edition Migrate data from a PostgreSQL database to OceanBase Database Community Edition View the details of a data migration project Change the name of a data migration project View and modify migration objects Manage computing platforms Use tags to manage data migration projects Perform batch operations on data migration projects Download and import settings of migration objects Start and pause a data migration project Release and delete a data migration project DML filtering DDL synchronization Configure matching rules for migration objects Wildcard rules Rename a database table Use SQL conditions to filter data Create and update a heartbeat table Schema migration mechanisms Schema migration operations Set an incremental synchronization timestamp Supported DDL operations in incremental migration from OceanBase Community Edition to a MySQL database and limits Supported DDL operations in incremental migration between MySQL tenants of OceanBase Database Data synchronization overview Create a project to synchronize data from OceanBase Database Community Edition to a Kafka instance Create a project to synchronize data from OceanBase Database Community Edition to a RocketMQ instance View details of a data synchronization project Change the name of a data synchronization project View and modify synchronization objects Use tags to manage data synchronization projects Perform batch operations on data synchronization projects Download and import the settings of synchronization objects Start and pause a data synchronization project Release and delete a data synchronization project DML filtering DDL synchronization Rename a topic Use SQL conditions to filter data Column filtering Data formats Create an OceanBase-CE data source Create a MySQL data source Create a TiDB data source Create a Kafka data source Create a RocketMQ data source Create a PostgreSQL data source Create an HBase data source View data source information Copy a data source Edit a data source Delete a data source Create a database user User privileges Enable binlogs for the MySQL database O&M overview Go to the overview page View server information Update the quota View server logs View O&M tasks Skip a task or subtask Retry a task or subtask Overview Manage users Manage departments View project alerts View system alerts Manage alert settings
OceanBase logo

The Unified Distributed Database for the AI Era.

Follow Us
Products
OceanBase CloudOceanBase EnterpriseOceanBase Community EditionOceanBase seekdb
Resources
DocsBlogLive DemosTraining & Certification
Company
About OceanBaseTrust CenterLegalPartnerContact Us
Follow Us

© OceanBase 2026. All rights reserved

Cloud Service AgreementPrivacy PolicySecurity
Contact Us
Document Feedback
  1. Documentation Center
  2. OceanBase Migration Service
  3. V4.2.3
iconOceanBase Migration Service
V 4.2.3Community Edition
Enterprise Edition
  • V 4.3.2
  • V 4.3.1
  • V 4.3.0
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.0.2
  • V 3.4.0
Community Edition
  • V 4.2.12
  • V 4.2.11
  • V 4.2.10
  • V 4.2.9
  • V 4.2.8
  • V 4.2.7
  • V 4.2.6
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.2.1
  • V 4.2.0
  • V 4.0.0
  • V 3.3.1

Data formats

Last Updated:2024-04-18 03:40:56  Updated
share
What is on this page
Data formats used in serialization methods
Default JSON message format
Canal JSON message format
Dataworks JSON message format
SharePlex JSON message format
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON message format
Maxwell JSON message format
Data formats used when data is transmitted from OceanBase Database to a message queue system

folded

share

This topic describes the data formats used in serialization methods and when data is transmitted from OceanBase Database to a message queue system.

Data formats used in serialization methods

When you use OceanBase Migration Service (OMS) Community Edition to synchronize data from the source to a Kafka, DataHub (Blob type), or RocketMQ instance, you can use a serialization method to control the format of the data to be synchronized to the destination. The serialization methods include Default, Canal, Dataworks (V2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Maxwell (supported only when the destination is a Kafka instance).

Default JSON message format

When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the Default serialization method:

{
 "prevStruct": {  // The image before modification.
   "col1": "val1" // The key-value pair that contains the full key value.
  },
  "postStruct": {  // The image after modification.
   "col1": "val1" // The key-value pair that contains the full key value.
  },
  "allMetaData" {
  "checkpoint": "STRING", // The current synchronization checkpoint, or the end synchronization timestamp during incremental synchronization, in seconds. It is indicated by using a primary key-value pair during full synchronization.
    "record_primary_key": "STRING",  // The name of the primary key column. If the primary key has multiple columns. Separate the column names with \u0001.
    "record_primary_value" "STRING", // The primary key value. If the primary key has multiple columns. Separate the column names with \u0001.
    "source_identity": "STRING",     // The source ID. The source ID is a subtopic in the case of incremental synchronization. It is a meaningless sequence in the case of full synchronization.
    "dbType": "STRING", // The database type. Valid values: MYSQL, ORACLE, OCEANBASE, OB_IN_ORACLE_MODE, DB2, OB_MYSQL, OB_ORACLE, and DB2_LUW. Note that OCEANBASE, OB_IN_ORACLE_MODE, and DB2 are used by earlier versions of the data transmission service and are compatible with the current version.
    "storeDataSequence": LONG, // This field is available only when the value of sequenceEnabled is true in the source.json configuration file in incremental synchronization scenarios. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits.
    "table_name": "STRING", // The name of the table that is changed by using an SQL statement.
    "db": "STRING", // The name of the database that is changed by using an SQL statement. If the specified database is an OceanBase database, the database name includes the tenant name in the {tenant}.{database} format.
    "timestamp": "STRING", // The timestamp in seconds for data modification, which is available only for incremental synchronization.
    "uniqueId": "STRING", // The unique identifier of a transaction that is passed from the Store component as a sequence number during incremental migration.
    "transId": "STRING",  // In OceanBase Database Community Edition earlier than V4.x, transId indicates the ID of a transaction. In a transaction ID, inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the HASH value of the preceding three items. For an incomplete transaction, the transaction ID is null. In OceanBase Database Community Edition V4.x and later versions, transId is specified in the format of "tenant ID_transaction ID", for example, "1002_336195". In MySQL or Oracle databases, transId indicates a checkpoint.
    "clusterId": "STRING", // clusterId indicates a cluster ID in OceanBase Database and a thread ID in MySQL databases. It does not exist in Oracle databases.
    "ddlType": "STRING", // The DDL operation type. This field is added in OMS Community Edition V4.1.1.
 },
 "recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // The change type.
}
  • In DDL records, the key is "ddl", which indicates the column name, and the values are DDL statements.

  • Beforeimage and afterimage:

    • prevStruct: The beforeimage of incremental data, which is the data before an SQL statement is executed.

    • postStruct: The afterimage of incremental data, which is the data after an SQL statement is executed.

    For the DELETE type, only prevStruct is available. For the INSERT and DDL types, only postStruct is available. For the UPDATE type, both prevStruct and postStruct are available. For the HEARTBEAT (periodic heartbeat message) type, neither postStruct nor postStruct is available.

  • clusterId is described as follows:

    • In OceanBase Database, ob_org_cluster_id specifies a session-level cluster ID, which will be persisted to the transaction log. If you specify ob_org_cluster_id when you write data, the specified value is applied. Otherwise, the default value is applied. This field is available only for incremental synchronization. For more information, see ob_org_cluster_id.

    • In a MySQL database, clusterId indicates a thread ID in a MySQL binlog event. The MySQL server assigns incremented thread IDs to threads on each connection.

Examples:

  • Example of data insertion (INSERT)

    {
      "allMetaData":{
        "checkpoint":null,
        "record_primary_key":"id1\u0001id2",
        "source_identity":null,
        "record_primary_value":"3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name":"table_name",
        "db":"tenant.database",
        "timestamp":"1609344671"
      },
      "prevStruct":null,
      "recordType":"INSERT",
      "postStruct":{
        "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col6":1.2222,
        "col7":9.999999,
        "col8":"hello world",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.99999999999,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345",
      }
    }
    
  • Example of data update (UPDATE)

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "id1\u0001id2",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name": "table_name",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      },
      "recordType": "UPDATE",
      "postStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    
  • Example of data deletion (DELETE)

    {
      "allMetaData":{
        "checkpoint":null,
        "record_primary_key":"id1\u0001id2",
        "source_identity":null,
        "record_primary_value":"3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name":"table_name",
        "db":"tenant.database",
        "timestamp":"1609344671"
      },
      "prevStruct":{
        "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col16":1.2222,
        "col7":9.99999999,
        "col8":"hello world",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.999999999,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345"
      },
      "recordType":"DELETE",
      "postStruct":null
    }
    
  • Example of a DDL operation

    alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
    
    {
    "prevStruct": null,
    "postStruct": {
      "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'"
    },
    "allMetaData": {
      "checkpoint": "1671177057",
      "dbType": "OB_MYSQL",
      "storeDataSequence": null,
      "db": "connector_test",
      "timestamp": "1671177057",
      "uniqueId": null,
      "ddlType": "ALTER_TABLE",
      "record_primary_key": null,
      "source_identity": null,
      "record_primary_value": null,
      "table_name": "all_mysql_type_test"
    },
    "recordType": "DDL"
    }
    

Canal JSON message format

When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the Canal serialization method:

{
 "database": "STRING", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
  "sqlType": {
   "col1": INTEGER, // Indicates that the INTEGER column type is modified. For more information, see java.sql.Types.
  },
  "data": [ // The modified data key-value pair. This field exists in only one message.
    {
     "col1": "val1"
    }
  ],
  "pkNames": [ // The primary key column name.
   "col1"
  ],
  "old": [ // This field exists only in update messages and indicates the column modified by using an UPDATE statement. It indicates the column value before the modification.
    {
     "col1": "val1"
    }
  ],
  "mysqlType": { // The description of the column type.
   "col": "STRING"
  },
  "type": "STRING",  // The modification type.
  "table": "STRING", // The name of the table that is changed by using an SQL statement.
  "es": LONG, // The modification time, with a millisecond-level timestamp.
  "isDdl": BOOLEAN, // Indicates whether it is a DDL statement.
  "ts": LONG, // The timestamp written into the destination database.
  "sql": "STRING", // It is empty.
}

Examples:

  • Example of data insertion (INSERT)

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"INSERT",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618323429026,
      "sql":""
    }
    
  • Example of data update (UPDATE)

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world 2020",
          "col3":1.2222,
          "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":[
        {
          "col":"hello world"
        }
      ],
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"UPDATE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364572908,
      "sql":""
    }
    
  • Example of data deletion (DELETE)

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "int8",
        "int16"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"DELETE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364660278,
      "sql":""
    }
    
  • Example of a DDL operation

    alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
    
    {
    "database": "connector_test",
    "sqlType": null,
    "data": null,
    "pkNames": null,
    "old": null,
    "mysqlType": null,
    "type": "ALTER",
    "table": "all_mysql_type_test",
    "es": 1671177209000,
    "isDdl": true,
    "ts": 1671177291475,
    "sql": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'"
    }
    

Dataworks JSON message format

When you synchronize data to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message format is used for the Dataworks serialization method:

{
 "version":"2.0", // The protocol version. Only DataWorks 2.0 is supported.
 "schema": { // The modified metadata information, with only the column name and column type specified.
 "source": {// The source of the modification.
  "dbType": "mysql", // The source type.
  "dbVersion": "5.7.35", // The database version.
  "dbName": "myDatabase", // The database name.
  "schema": "mySchema", // The schema name. This field is required in a system with schemas.
  "table": "tableName" // The table name.
 }
 "column": [// The modified data column. This field indicates the updated record content in the target table.
  {
  "name": "id",
  "type": "bigint"
  },
  {
  "name": "name",
  "type": "varchar(20)"
  },
  {
  "name": "mydata",
  "type": "binary"
  },
  {
  "name": "ts",
  "type": "datetime"
  }
 ],
 "pk": [// This field is required if a primary key or unique key is available. Otherwise, it is optional.
  "pkName1",
  "pkName2"
 ]
},
"payload": {
 "before": {
  "data":{
   "id": 111,
   "name":"scooter",
   "mydata": "[base64 string]", // Base64 encoding is required for the binary type.
   "ts": 1590315269000.123456789 // The timestamp with an integer part of 13 digits and a fractional part of 9 digits.
  }
 },
 "after": {
  "data":{
   "id": 222,
   "name":"donald",
   "mydata": "[base64 string]",
   "ts": 1590315269000
  }
 },
 "op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTE
R/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",// Case sensitive.
 "timestamp": {
  "eventTime": 1620457659000 // The modification time on the source database, which is a 13-digit timestamp in milliseconds.
 },
 "ddl": {
  "text": "ADD COLUMN ..."
 },
 "scn": "The system change number (SCN) that increments."
},
"extend": { // The extend field that can be used for meeting extension requirements in the future. This field can be left unspecified if no extension is available.
 "load_fm":"CIBS", // The source system, for example, "CIBS".
 }
}

Heartbeat message of a synchronization task:

{
 "version":"2.0", // The protocol version.
 "payload": {
 "timestamp": {
  "eventTime": 1620457659000 // The timestamp of the heartbeat packet.
 },
 "op": "HEARTBEAT" // Indicates a heartbeat packet.
 }
}

Examples:

  • Example of data insertion (INSERT)

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName12"
            ]
        },
        "payload":{
            "before":null,
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"INSERT",
            "timestamp":{
                "eventTime":1647581000000,
                "systemTime":1647581000795,
                "checkpointTime":1647581000
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • Example of data update (UPDATE)

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world 2020",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"UPDATE",
            "timestamp":{
                "eventTime":1647581038000,
                "systemTime":1647581038674,
                "checkpointTime":1647581038
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • Example of data deletion (DELETE)

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":null,
            "op":"DELETE",
            "timestamp":{
                "eventTime":1647581072000,
                "systemTime":1647581072976,
                "checkpointTime":1647581072
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • Example of a DDL operation

    alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
    
    {
    "version": "2.0",
    "schema": {
      "source": {
        "dbType": "ob_mysql",
        "dbVersion": null,
        "dbName": "connector_test",
        "schema": null,
        "table": "all_mysql_type_test"
      },
      "column": null,
      "pk": null
    },
    "payload": {
      "before": null,
      "after": null,
      "op": "ALTER",
      "timestamp": {
        "eventTime": 1671177209000,
        "systemTime": 1671177291485,
        "checkpointTime": 1671177200
      },
      "ddl": {
        "text": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'"
      },
      "scn": "null"
    },
    "extend": { }
    }
    

SharePlex JSON message format

When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the SharePlex serialization method:

{
 "data": { // The modified data key-value pair. If an INSERT or a DELETE statement is executed, the value is the full value. If an UPDATE statement is executed, the value is the modified value.
   "col1": "val1"
  },
  "meta": {
    "time": "YYYY-MM-DDTHH:mm:ss",  // The modification time.
    "op": "", // The change type. Valid values: ins, upd, del, and ddl.
    "posttime": "YYYY-MM-DDTHH:mm:ss",  // The time at which data is written to the destination.
    "idx":"STRING", // Indexes of messages in the transaction/Number of index messages. At present, OMS does not measure the total number of transactions. Therefore, this field is invalid.
    "size": NUMBER, // The number of messages in the transaction. This field is deprecated.
    "seq": "STRING",   // The sorting sequence. This field is available only when transactionEnabled is set to true on the source database.
    "table": "STRING", // The database and table name in the {database}.{table} format modified by using an SQL statement.
    "rowid": "STRING", // The row ID in the {database name.table name}-{primary key value} format. If the primary key has multiple columns, separate the column names with \u0001.  
    "trans": "STRING", // The transaction ID.
    "scn": "STRING",   // This field is available only when the value of sequenceEnabled is true in the source.json configuration file during incremental synchronization. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits.
  },
  "key": { // The key value before modification, which is available only for modifications by using an UPDATE statement.
  },
  "sql": {"ddl": ""} // This field is available only for DDL statements. If op is set to ddl, a DDL statement is written.
}

Examples:

  • Example of data insertion (INSERT)

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:22:00",
        "op":"ins",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • Example of data update (UPDATE)

    {
      "data":{
        "col":"hello world 2020"
      },
      "meta":{
        "posttime":"2020-12-07T13:59:09",
        "op":"upd",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      },
      "key":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      }
    }
    
  • Example of data deletion (DELETE)

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:34:10",
        "op":"del",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • Example of a DDL operation

    alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
    
    {
    "data": {},
    "meta": {
      "posttime": "2022-12-16T15:54:51",
      "op": "ddl",
      "size": 0,
      "time": "2022-12-16T15:53:29",
      "idx": "0/0",
      "seq": 0,
      "table": "connector_test.all_mysql_type_test",
      "rowid": "connector_test.all_mysql_type_test-",
      "trans": null,
      "scn": "null"
    },
    "sql": {
      "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'"
    }
    }
    

DefaultExtendColumnType JSON message format

When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the DefaultExtendColumnType serialization method.

The DefaultExtendColumnType JSON messages are created in the Default format with a __light_type field. This field specifies the data type of fields.

{
 "prevStruct": {  // The image before modification.
  },
  "postStruct": {  // The image after modification.
      "__light_type": {
      "col":{ // The field name.
        "schemaType":"type" // The value type.
      }
    }
  },
  "allMetaData" {},

}

Examples:

  • Example of data insertion (INSERT)

    {
      "allMetaData":{
        "checkpoint":null,
        "record_primary_key":"id1\u0001id2",
        "source_identity":null,
        "record_primary_value":"3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name":"table",
        "db":"database",
        "timestamp":"1609344671"
      },
      "prevStruct":null,
      "recordType":"INSERT",
      "postStruct":{
        "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col6":1.2222,
        "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col8":"hello world",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long":{
            "schemaType":"TIMESTAMP"
          }
        }
      }
    }
    
  • Example of data update (UPDATE)

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "id1\u0001id2",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name": "table",
        "db": "database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
         "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col6":1.2222,
        "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col8":"hello world",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType": "UPDATE",
      "postStruct": {
         "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col6":1.2222,
        "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col8":"hello world 2020",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      }
    }
    
  • Example of data deletion (DELETE)

    {
      "allMetaData":{
        "checkpoint":null,
        "record_primary_key":"id1\u0001id2",
        "source_identity":null,
        "record_primary_value":"3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name":"table",
        "db":"database",
        "timestamp":"1609344671"
      },
      "prevStruct":{
         "col1":3,
        "col2":129,
        "col3":2147483646,
        "col4":9223372036854775806,
        "col5":10223372036854775806,
        "col6":1.2222,
        "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col8":"hello world",
        "col9":"aGVsbG8gd29ybGQ=",
        "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308,
        "col11":"2020-11-25",
        "col12":"00:01:02",
        "col13":"2020-11-25 00:01:02",
        "col14":"1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType":"DELETE",
      "postStruct":null
    }
    
  • Example of a DDL operation

    alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';
    
    {
    "prevStruct": null,
    "postStruct": {
      "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'",
      "__light_type": {
        "ddl": {
          "schemaType": "VAR_STRING"
        }
      }
    },
    "allMetaData": {
      "checkpoint": "1671177200",
      "dbType": "OB_MYSQL",
      "storeDataSequence": null,
      "db": "connector_test",
      "timestamp": "1671177209",
      "uniqueId": null,
      "ddlType": "ALTER_TABLE",
      "record_primary_key": null,
      "source_identity": null,
      "record_primary_value": null,
      "table_name": "all_mysql_type_test"
    },
    "recordType": "DDL"
    }
    

Debezium JSON message format

When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (Blob type), or RocketMQ instance, two JSON message formats are used for the Debezium serialization method. By default, only the struct in the payload field is displayed.

  • Format applied when both schema and payload{#2f02ea300egch} are available

    {
     "schema":{// Describes information about the payload field. By default, this struct is not displayed.
      "type": "struct",// struct indicates that the field contains a struct.
      "optional": false,// Specifies whether the field is required.
      "fields": [
      {
        "type": "int64",// The type of the field.
        "optional": false,// Specifies whether the field is required.
        "field": "ts_ms"// The name of the field.
            }
         ...
       ]
        },
      "payload":{
        "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message.
        "source":{
          "version":"" // The OMS version.
          "connector":"OB_MYSQL", // The type of the data source.
          "name":"OMS", // A fixed value, which is OMS.
          "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization.
          "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
          "table":"testTab" // The name of the table that is changed by using an SQL statement.
          "pos":"553132@1668496109" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset].
        },
        "before":{ // The image before modification.
          "column":"value" // The key-value pair that contains the full key value.
        }
        "after":{ // The image after modification.
          "column":"value" // The key-value pair that contains the full key value.
        },
        "ts_ms":1668497367188 // The timestamp of data processing.
      }
    }
    
  • Format applied when only payload is available

    {
      "payload":{
        "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message.
        "source":{
          "version":"" // The OMS version.
          "connector":"OB_MYSQL", // The type of the data source.
          "name":"OMS", // A fixed value, which is OMS.
          "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization.
          "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
          "table":"testTab" // The name of the table that is changed by using an SQL statement.
          "pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset].
        },
        "before":{ // The image before modification.
          "column":"value" // The key-value pair that contains the full key value.
        }
        "after":{ // The image after modification.
          "column":"value" // The key-value pair that contains the full key value.
        },
        "ts_ms":1668497367188 // The timestamp of data processing.
      }
    }
    

Examples:

  • Example of data insertion (INSERT)

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"c",
                "source":{
                "connector":"OB_MYSQL",
                "pos":"703223@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668491621000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":2,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495423594
              }
              }
    
  • Example of data update (UPDATE)

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"u",
                "before":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "source":{
                "connector":"OB_MYSQL",
                "pos":"436999@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668495861000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495906356
              }
              }
    
  • Example of data deletion (DELETE)

    {
        "schema":{
            "optional":false,
            "type":"STRUCT",
            "fields":[
                {
                    "field":"before",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"after",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"source",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"version",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"connector",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"name",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"ts_ms",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"db",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"table",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"server_id",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"pos",
                            "optional":false,
                            "type":"string"
                        }
                    ]
                },
                {
                    "field":"op",
                    "optional":false,
                    "type":"string"
                },
                {
                    "field":"ts_ms",
                    "optional":false,
                    "type":"int64"
                }
            ]
        },
        "payload":{
            "op":"d",
            "before":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
            },
            "source":{
                "connector":"OB_MYSQL",
                "pos":"553132@1668****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668496109000,
                "db":"test",
                "table":"table_name"
            },
            "ts_ms":1668496119717
        }
    }
    

DebeziumFlatten JSON message format

When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message format is used for the DebeziumFlatten serialization method. Different from the Debezium serialization method, the DebeziumFlatten serialization method does not have the schema or payload section.

{
   "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message.
   "source":{
     "version":"" // The OMS version.
     "connector":"OB_MYSQL", // The type of the data source.
     "name":"OMS", // A fixed value, which is OMS.
     "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization.
     "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
     "table":"testTab" // The name of the table that is changed by using an SQL statement.
     "pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset].
   },
   "before":{ // The image before modification.
     "column":"value" // The key-value pair that contains the full key value.
   }
   "after":{ // The image after modification.
     "column":"value" // The key-value pair that contains the full key value.
   },
   "ts_ms":1668497367188 // The timestamp of data processing.
  }

Examples:

  • Example of data insertion (INSERT)

    {
       "op":"c",
       "source":{
           "connector":"OB_MYSQL",
           "pos":"703223@166849****",
           "name":"OMS",
           "version":"",
           "ts_ms":1668491621000,
           "db":"test",
           "table":"table_name"
           },
       "after":{
           "c11":"a",
           "c10":2.4212412,
           "c13":"c",
           "c12":"b",
           "c15":"65",
           "c14":"d",
           "c17":"67",
           "c16":"f",
           "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
           "c18":"68",
           "c20":"6A",
           "c22":19311,
           "c21":"2022-11-15T05:12:11Z",
           "c02":"12312",
           "c24":1668489131000,
           "c01":2,
           "c23":36060000000,
           "c04":"61",
           "c26":"6B",
           "c03":"1241.41000",
           "c25":2022,
           "c06":141,
           "c05":11,
           "c08":412124124,
           "c07":4241,
           "c09":2.11111
           },
           "ts_ms":1668495423594
           }
    
  • Example of data update (UPDATE)

    {
        "op":"u",
        "before":{
            "c11":"a",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"436999@166849****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668495861000,
            "db":"test",
            "table":"table_name"
            },
        "after":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
            "ts_ms":1668495906356
            }
    
  • Example of data deletion (DELETE)

    {
        "op":"d",
        "before":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"553132@1668****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668496109000,
            "db":"test",
            "table":"table_name"
        },
        "ts_ms":1668496119717
    }
    

DebeziumSmt JSON message format

DebeziumSmt is a configuration method provided by Debezium for transforming and processing a single message through event flattening Single Message Transform (SMT). When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (Blob type), or RocketMQ instance, only the key:value fields in after are displayed in the JSON message format for the DebeziumSmt serialization method.

The following example shows how to use the Debezium serialization method to update data:

{
    "op": "u",
    "source": {
    "connector":"OB_MYSQL",
    "name":"OMS",
    ...
    },
    "ts_ms": 1668496119717,
    "before": {
    "field1": "before_value1",
    "field2": "before_value2"
    },
    "after": {
    "field1": "after_value1",
    "field2": "after_value2"
    }
}

The format of the message in the preceding example is simplified through SMT. The following example shows the JSON message format for the DebeziumSmt serialization method:

{
    "field1": "after_value1",
    "field2": "after_value2"
}

Examples:

  • Example of data insertion (INSERT)

    {
     "field1": "after_value1",
     "field2": "after_value2",
     "__deleted": "false"
    }
    
    
  • Example of data update (UPDATE)

    {
     "field1": "after_value1",
     "field2": "after_value2",
     "__deleted": "false"
    }
    
  • Example of data deletion (DELETE)

    {
     "field1": "after_value1",
     "field2": "after_value2",
     "__deleted": "true"
    }
    

Maxwell JSON message format

When you synchronize data to a Kafka instance, the following JSON message format is used for the Maxwell serialization method:

{
    "database":"STRING", // The name of the database.
    "table":"STRING", // The name of the table.
    "type":"STRING", // The type of the operation. Valid values: insert, update, delete, table-create, table-alter, and table-drop.
    "ts":LONG, // The timestamp. The fixed value for full migration is 1600000000L.
    "xid":"STRING", // In OceanBase Database Community Edition earlier than V4.x, xid indicates the ID of a transaction. In a transaction ID, inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the HASH value of the preceding three items. For an incomplete transaction, the transaction ID is null. In OceanBase Database Community Edition V4.x and later versions, xid is specified in the format of "tenant ID_transaction ID", for example, "1002_336195". In MySQL databases, xid indicates a checkpoint.
    "commit":BOOLEAN, // Specifies whether to commit the transaction. The fixed value is true.
    "position":"STRING", // The checkpoint, in the format of a timestamp, for example, "1684224949".
    "primary_key":OBJECT[], // The primary key value. In a DDL statement, the value of this parameter is null. In a DML statement, the value of this field is OBJECT[], for example, [1,"val1"], or [] if no primary key exists.
    "primary_key_columns":STRING[], // The primary key column. In a DDL statement, the value of this field is null. In a DML statement, the value of this field is OBJECT[], for example, ["key1","key2"], or [] if no primary key exists. Each listed item corresponds to one primary key value.
    "data":{ // The data after a DML change.
      "col1":"val1", // The key-value pair.
  },
    "old":{ // The data before the DML change.
        "col1":"val1", // The key-value pair.
    }
    "sql":"STRING", // The SQL statement for executing the DDL change.
    "def":{ // The information about the DDL change objects.
        "database":"STRING", // The name of the database.
        "table":"STRING", // The name of the table.
        "primary-key":OBJECT[], // The primary key value. The fixed value is [].
        "columns":STRING[] // The primary key column. The fixed value is [].
    },
    "uuid":"STRING" // The unique identifier.
    "mesh":"prd/gray" // The grayscale identifier. This field is available only after you specify a grayscale identifier.
}

Examples:

  • Example of data insertion (INSERT)

    {
      "database":"test",
      "table":"test",
      "type":"insert",
      "ts":1684225073,
      "xid":"1002_336195",
      "commit":true,
      "position":"1684225073",
      "primary_key":[
          "1"
      ],
      "primary_key_columns":[
          "id"
      ],
      "data":{
          "m":4.2341,
          "id":1,
          "c":"2016-10-21 05:33:37.523000"
      },
      "old":null,
      "sql":null,
      "def":null
    }
    
  • Example of data update (UPDATE)

    {
      "database":"test",
      "table":"test",
      "type":"update",
      "ts":1684225127,
      "xid":"1002_336403",
      "commit":true,
      "position":"1684225126",
      "primary_key":[
          "1"
      ],
      "primary_key_columns":[
          "id"
      ],
      "data":{
          "m":5.444,
          "id":1,
          "c":"2016-10-21 05:33:54.63100"
      },
      "old":{
          "m":4.2341,
          "id":1,
          "c":"2016-10-21 05:33:37.523000"
      },
      "sql":null,
      "def":null
    }
    
  • Example of data deletion (DELETE)

    {
      "database":"test",
      "table":"test",
      "type":"delete",
      "ts":1684225163,
      "xid":"1002_336593",
      "commit":true,
      "position":"1684225163",
      "primary_key":[
          "1"
      ],
      "primary_key_columns":[
          "id"
      ],
      "data":{
          "m":5.444,
          "id":1,
          "c":"2016-10-21 05:33:54.631000"
      },
      "old":null,
      "sql":null,
      "def":null
    }
    

Data formats used when data is transmitted from OceanBase Database to a message queue system

This section lists the possible data format mappings when you synchronize data from OceanBase Database Community Edition to a Kafka, DataHub (Blob type), or RocketMQ instance.

  • The following table describes the format mappings for OceanBase Database Community Edition when the serialization method is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType.

    Data type Mapped-to type Description
    TINYINT
    SMALLINT
    MEDIUMINT
    INT/INTEGER
    YEAR
    BOOL/BOOLEAN
    Long Integers that are shorter than 64 digits.
    Scientific notation is not used for a normal value such as 1000.
    For a BOOL or BOOLEAN value, true is converted to 1 and false is converted to 0.
    DECIMAL
    NUMERIC
    BigDecimal Exact decimal numerals and integers longer than 64 digits.
    An integer is displayed without the decimal point or a decimal place.
    A decimal numeral is displayed based on the data passed into the database instance without removing zeros at the end. Scientific notation is used.
    FLOAT
    DOUBLE
    Double Floating-point numbers. The number of significant digits is determined based on the source data type.
    If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.
    CHAR
    VARCHAR
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    ENUM
    SET
    String Strings.
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BINARY
    VARBINARY
    BIT
    Bytes Byte arrays. By default, a byte array is displayed in a Base64-encoded string.
    Note
    For a fixed-length data type, after an incremental byte array is received, the zeros in the most significant bits are removed. However, such zeros are not removed for a full-byte array. Therefore, the Base64-encoded strings may be inconsistent, However, the actual results obtained after decoding are consistent.
    DATE Date The DATE type in the format of YYYY-MM-DD. If the value is invalid, the original string is displayed.
    TIME Time The TIME type in the format of HH:mm:ss[.nnnnnnnnn].
    The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
    DATETIME DateTime The DATETIME type in the format of YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId], including the time zone.
    The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
    TIMESTAMP Timestamp The TIMESTAMP type in the format of [second-level timestamp][.nnnnnnnnn].
    The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the time is invalid, 0000-00-00 00:00:00 is displayed.

    Compared with OMS Community Edition of earlier versions, a project in OMS Community Edition V3.3.0 and later has the following changes:

    Note

    After you upgrade OMS Community Edition to V3.3.0 or a later version, a project created in OMS Community Edition of an earlier version does not change.

    • The BIT data type of OceanBase Database Community Edition is converted into a Base64-encoded byte value rather than a value of the LONG data type. The Bytes value can be Base64-decoded to the source value.

    • The TIMESTAMP data type of OceanBase Database Community Edition is passed in the sec.mill format. In OMS Community Edition earlier than V3.3.0, the TIMESTAMP data type is passed as integers in the sec format.

    • The format of other time data types is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] in OMS Community Edition V3.3.0 and later versions, and YYYY-MM-DDTHH:mm[:ss] in OMS Community Edition earlier than V3.3.0.

  • The following table describes data format mappings for OceanBase Database Community Edition when the serialization method is Debezium.

    Data type Mapped-to type Description
    BOOLEAN
    BOOL
    BOOLEAN Valid values: true and false.
    TINYINT
    SMALLINT
    MEDIUMINT
    INT/INTEGER
    BIGINT
    YEAR
    LONG An integer that ranges from -2^63^ to 2^63^.
    BIGINT STRING Strings that display the data in full.
    FLOAT
    DOUBLE
    DOUBLE Floating-point numbers.
    DECIMAL
    NUMERIC
    STRING Strings that display the data in full. A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used.
    BIT
    BINARY
    VARBINARY
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES Base16-encoded byte arrays.
    CHAR
    VARCHAR
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    ENUM
    SET
    STRING Strings.
    TIMESTAMP STRING The timestamp is in the YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z format. The time zone is UTC+0.
    DATE LONG The number of days since January 1st, 1970.
    TIME LONG The temporal value, in microseconds, since 00:00:00, excluding the time zone.
    DATETIME LONG The temporal value, in milliseconds, since 1970-01-01 00:00:00, excluding the time zone.
  • The following table describes data format mappings for OceanBase Database Community Edition when the serialization method is Maxwell.

    Data type Mapped-to type Description
    TINYINT
    SMALLINT
    MEDIUMINT
    INT/INTEGER
    YEAR
    BOOL/BOOLEAN
    Long Integers that are shorter than 64 digits.
    Scientific notation is not used for a normal value such as 1000.
    For a BOOL or BOOLEAN value, true is converted to 1 and false is converted to 0.
    DECIMAL
    NUMERIC
    BigDecimal Exact decimal numerals and integers longer than 64 digits.
    An integer is displayed without the decimal point or a decimal place.
    A decimal numeral is displayed based on the data passed into the database instance without removing zeros at the end. Scientific notation is used.
    FLOAT
    DOUBLE
    Double Floating-point numbers. The number of significant digits is determined based on the source data type.
    If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.
    CHAR
    VARCHAR
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    ENUM
    String Strings.
    SET JSON arrays JSON arrays, for example, ["a","b"].
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BINARY
    VARBINARY
    BIT
    Bytes Byte arrays. By default, a byte array is displayed in a Base64-encoded string.
    Note
    For a fixed-length data type, after an incremental byte array is received, the zeros in the most significant bits are removed. However, such zeros are not removed for a full-byte array. Therefore, the Base64-encoded strings may be inconsistent, However, the actual results obtained after decoding are consistent.
    DATE Date The DATE type in the format of YYYY-MM-DD. If the value is invalid, the original string is displayed.
    TIME Time The TIME type in the format of HH:mm:ss[.nnnnnnnnn].
    The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.
    DATETIME DateTime The DATETIME type, excluding the time zone. The time zone is specified when data is written into the business application. The value of this field is in the format of YYYY-MM-DD HH:mm:ss[.nnnnnn].
    By default, the fractional seconds part contains 6 digits. If the fractional seconds part contains more than 6 digits and the last 3 digits are not all zeros, 9 digits are displayed.
    If the value is invalid, the original string is displayed.
    TIMESTAMP Timestamp The DATETIME type, excluding the time zone. The time zone is UTC+0. The value of this field is in the format of YYYY-MM-DD HH:mm:ss[.nnnnnn].
    By default, the fractional seconds part contains 6 digits. If the fractional seconds part contains more than 6 digits and the last 3 digits are not all zeros, 9 digits are displayed.
    If the value is invalid, the original string is displayed.

Previous topic

Column filtering
Last

Next topic

Create an OceanBase-CE data source
Next
What is on this page
Data formats used in serialization methods
Default JSON message format
Canal JSON message format
Dataworks JSON message format
SharePlex JSON message format
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON message format
Maxwell JSON message format
Data formats used when data is transmitted from OceanBase Database to a message queue system