OceanBase logo

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Resources

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS

OceanBase Cloud

OceanBase Database

Tools

Connectors and Middleware

QUICK START

OceanBase Cloud

OceanBase Database

BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Company

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

International - English
中国站 - 简体中文
日本 - 日本語
Sign In
Start on Cloud

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS
OceanBase CloudOceanBase Database
ToolsConnectors and Middleware
QUICK START
OceanBase CloudOceanBase Database
BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

Start on Cloud
编组
All Products
    • Databases
    • iconOceanBase Database
    • iconOceanBase Cloud
    • iconOceanBase Tugraph
    • iconInteractive Tutorials
    • iconOceanBase Best Practices
    • Tools
    • iconOceanBase Cloud Platform
    • iconOceanBase Migration Service
    • iconOceanBase Developer Center
    • iconOceanBase Migration Assessment
    • iconOceanBase Admin Tool
    • iconOceanBase Loader and Dumper
    • iconOceanBase Deployer
    • iconKubernetes operator for OceanBase
    • iconOceanBase Diagnostic Tool
    • iconOceanBase Binlog Service
    • Connectors and Middleware
    • iconOceanBase Database Proxy
    • iconEmbedded SQL in C for OceanBase
    • iconOceanBase Call Interface
    • iconOceanBase Connector/C
    • iconOceanBase Connector/J
    • iconOceanBase Connector/ODBC
    • iconOceanBase Connector/NET
icon

OceanBase Migration Service

V4.2.4Enterprise Edition

  • OMS Documentation
  • OMS Introduction
    • What is OMS?
    • Terms
    • OMS HA
    • Principles of Store
    • Principles of Full-Import and Incr-Sync
    • Data verification principles
    • Architecture
      • Overview
      • Hierarchical functional system
      • Basic components
    • OMS Oracle full data migration design and impact
    • Limitations
  • Quick Start
    • Data migration process
    • Data synchronization process
  • Deploy OMS
    • Deployment types
    • System and network requirements
    • Memory and disk requirements
    • Environment preparations
    • Deploy OMS on a single node
    • Deploy OMS on multiple nodes in a single region
    • Deploy OMS on multiple nodes in multiple regions
    • Integrate the OIDC protocol to OMS to implement SSO
    • Scale out
    • Check the deployment
    • Deploy a time-series database (Optional)
  • OMS console
    • Log in to the OMS console
    • Overview
    • User center
      • Configure user information
      • Change your login password
      • Log out
  • Data migration
    • Overview
    • Migrate data from a MySQL database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a MySQL-compatible tenant of OceanBase Database to a MySQL database
    • Migrate data from an Oracle database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from an Oracle-compatible tenant of OceanBase Database to an Oracle database
    • Migrate data from an Oracle database to an Oracle-compatible tenant of OceanBase Database
    • Migrate data from a DB2 LUW database to an Oracle-compatible OceanBase database
    • Migrate data from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database
    • Migrate data from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database
    • Migrate data within OceanBase Database
    • Create an active-active disaster recovery task in OceanBase Database
    • Migrate data from a TiDB database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a PostgreSQL database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a PolarDB-X 1.0 database to a MySQL-compatible tenant of OceanBase Database
    • Migrate incremental data from an Oracle-compatible tenant of OceanBase Database to a MySQL database
    • Manage data migration tasks
      • View details of a data migration task
      • Rename a data migration task
      • View and modify migration objects
      • Use tags to Manage data migration tasks
      • Perform batch operations on data migration tasks
      • Download and import settings of migration objects
      • View and modify the parameter configurations of a data migration task
      • Start and pause a data migration task
      • Release and delete a data migration task
    • Supported DDL operations and limits for synchronization
      • Synchronize DDL operations from a MySQL database to a MySQL-compatible tenant of OceanBase Database
        • Overview
        • CREATE TABLE
          • Create a table
          • Create a column
          • Create an index or a constraint
          • Create partitions
        • Data type conversion
        • ALTER TABLE
          • Modify a table
          • Operations on columns
          • Operations on constraints and indexes
          • Operations on partitions
        • TRUNCATE TABLE
        • RENAME TABLE
        • DROP TABLE
        • CREATE INDEX
        • DROP INDEX
        • DDL incompatibilities between a MySQL database and a MySQL-compatible tenant of OceanBase Database
          • Overview
          • Incompatibilities of the CREATE TABLE statement
            • Incompatibilities of CREATE TABLE
            • Column types that are supported to create indexes or constraints
          • Incompatibilities of the ALTER TABLE statement
            • Incompatibilities of ALTER TABLE
            • Change the type of a constrained column
            • Change the type of an unconstrained column
            • Change the length of a constrained column
            • Change the length of an unconstrained column
            • Delete a constrained column
          • Incompatibilities of DROP INDEX operations
      • Synchronize DDL operations from a MySQL-compatible tenant of OceanBase Database to a MySQL database
      • DDL operations for synchronizing data from an Oracle database to an Oracle-compatible tenant of OceanBase Database
        • Overview of DDL synchronization from Oracle to an Oracle-compatible tenant of OceanBase Database
        • CREATE TABLE
          • Overview
          • Create a relational table
            • Create a relational table
            • Define columns of a relational table
          • Virtual columns
          • Regular columns
          • Create partitions
            • Overview
            • Partitioning
            • Subpartitioning
            • Composite partitioning
            • User-defined partitioning
            • Subpartition templates
          • Constraints
            • Overview
            • Inline constraints
            • Out-of-line constraints
        • CREATE INDEX
          • Overview
          • Normal indexes
        • ALTER TABLE
          • Modify tables
          • Modify, drop, and add table attributes
          • Column attribute management
            • Modify, drop, and add column attributes
            • Rename a column
            • Add columns and column attributes
            • Modify column attributes
            • Drop columns
          • Modify, drop, and add constraints
          • Partition management
            • Modify, drop, and add partitions
            • Drop partitions
            • Drop subpartitions
            • Add partitions and subpartitions
            • Modify partitions
            • Truncate partitions
        • DROP TABLE
        • RENAME OBJECT
        • TRUNCATE TABLE
        • DROP INDEX
        • DDL incompatibilities between an Oracle database and an Oracle-compatible tenant of OceanBase Database
          • Overview
          • Incompatibilities of CREATE TABLE
          • Incompatibilities in table modification operations
            • Incompatibilities of ALTER TABLE
            • Change the type of a constrained column
            • Change the type of an unconstrained column
            • Change the length of a constrained column
            • Change the length of an unconstrained column
      • DDL synchronization from an Oracle-compatible tenant of OceanBase Database to an Oracle database
      • Synchronize DDL operations from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database
      • Synchronize DDL operations from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database
      • Synchronize DDL operations from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database
      • Synchronize DDL operations from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database
      • DDL synchronization between MySQL-compatible tenants of OceanBase Database
      • DDL synchronization between Oracle-compatible tenants of OceanBase Database
  • Data synchronization
    • Overview
    • Synchronize data from OceanBase Database to a Kafka instance
    • Synchronize data from OceanBase Database to a RocketMQ instance
    • Synchronize data from OceanBase Database to a DataHub instance
    • Synchronize data from an ODP logical table to a physical table in a MySQL-compatible tenant of OceanBase Database
    • Synchronize data from an ODP logical table to a DataHub instance
    • Synchronize data from an IDB logical table to a physical table in a MySQL-compatible tenant of OceanBase Database
    • Synchronize data from an IDB logical table to a DataHub instance
    • Synchronize data from a MySQL database to a DataHub instance
    • Synchronize data from an Oracle database to a DataHub instance
    • Manage data synchronization tasks
      • View details of a data synchronization task
      • Change the name of a data synchronization task
      • View and modify synchronization objects
      • Use tags to Manage data synchronization tasks
      • Perform batch operations on data synchronization tasks
      • Download and import the settings of synchronization objects
      • View and modify the parameter configurations of a data synchronization task
      • Start and pause a data synchronization task
      • Release and delete a data synchronization task
  • Create and manage data sources
    • Create data sources
      • Create an OceanBase data source
        • Create a physical OceanBase data source
        • Create an ODP data source
        • Create an IDB data source
        • Create a public cloud OceanBase data source
      • Create a MySQL data source
      • Create an Oracle data source
      • Create a TiDB data source
      • Create a Kafka data source
      • Create a RocketMQ data source
      • Create a DataHub data source
      • Create a DB2 LUW data source
      • Create a PostgreSQL data source
      • Create a PolarDB-X 1.0 data source
    • Manage data sources
      • View data source information
      • Copy a data source
      • Edit a data source
      • Delete a data source
    • Create a database user
    • User privileges
    • Enable binlogs for the MySQL database
    • Minimum privileges required when an Oracle database serves as the source
  • OPS & Monitoring
    • O&M overview
    • Go to the overview page
    • Server
      • View server information
      • Update the quota
      • View server logs
    • Components
      • Store
        • Create a store
        • View details of a store
        • Update the configurations of a store
        • Start and pause a store
        • Delete a store
      • Incr-Sync
        • View details of an Incr-Sync component
        • Start and pause an Incr-Sync component
        • Migrate an Incr-Sync component
        • Update the configurations of an Incr-Sync component
        • Batch O&M
        • Delete an Incr-Sync component
      • Full-Import
        • View details of a Full-Import component
        • Pause a Full-Import component
        • Rerun and resume a Full-Import component
        • Update the configurations of a Full-Import component
        • Delete a Full-Import component
      • Full-Verification
        • View details of a Full-Verification component
        • Pause a Full-Verification component
        • Rerun and resume a Full-Verification component
        • Update the configurations of a Full-Verification component
        • Delete a Full-Verification component
    • O&M Task
      • View O&M tasks
      • Skip a task or subtask
      • Retry a task or subtask
    • Parameter Template
      • Overview
      • Task Template
        • Create a task template
        • View and edit task templates
        • Copy and export a task template
        • Delete a task template
      • Component Template
        • Create a component template
        • View and edit component templates
        • Copy and export a component template
        • Delete a component template
      • Component parameters
        • Store parameters
        • Incr-Sync parameters
        • Full-Import parameters
        • Full-Verification parameters
        • CM parameters
        • Supervisor parameters
  • System management
    • Permission Management
      • Overview
      • Manage users
      • Manage departments
    • Alert center
      • View task alerts
      • View system alerts
      • Manage alert settings
    • Associate with OCP
    • System parameters
      • Modify system parameters
      • Modify HA configurations
      • oblogproxy parameters
    • Operation audit
  • Troubleshooting Guide
    • Manage OMS services
    • OMS logs
    • Component O&M
      • O&M operations for the Supervisor component
      • CLI-based O&M for the Connector component
      • O&M operations for the Store component
    • Component tuning
      • Incr-Sync/Full-Import tuning
      • Oracle store tuning
    • Set throttling
    • Store performance diagnostics
  • Reference Guide
    • Features
      • Configure DDL/DML synchronization
      • Supported DDL operations for synchronization
      • Rename a migration or synchronization object
      • Use SQL conditions to filter data
      • Set an incremental synchronization timestamp
      • Configure matching rules
      • Wildcard patterns supported for matching rules
      • Hidden column mechanisms
      • Instructions on schema migration
      • Create and update a heartbeat table
      • Change the topic
      • Column filtering
      • Data formats
    • API Reference
      • Overview
      • CreateProject
      • StartProject
      • StopProject
      • ResumeProject
      • ReleaseProject
      • DeleteProject
      • ListProjects
      • DescribeProject
      • DescribeProjectSteps
      • DescribeProjectStepMetric
      • DescribeProjectProgress
      • DescribeProjectComponents
      • ListProjectFullVerifyResult
      • StartProjectsByLabel
      • StopProjectsByLabel
      • CreateMysqlDataSource
      • CreateOceanBaseDataSource
      • CreateOceanBaseODPDataSource
      • CreatePolarDBDataSource
      • ListDataSource
      • CreateLabel
      • ListAllLabels
      • DeleteDataSource
      • CreateProjectModifyRecords
      • ListProjectModifyRecords
      • StopProjectModifyRecords
      • RetryProjectModifyRecords
      • CancelProjectModifyRecord
      • SubmitPreCheck
      • GetPreCheckResult
      • UpdateProjectConfig
    • Alert Reference
      • oms_host_down
      • oms_host_down_migrate_resource
      • oms_host_threshold
      • oms_migration_failed
      • oms_migration_delay
      • oms_sync_failed
      • oms_sync_status_inconsistent
      • oms_sync_delay
    • OMS error codes
    • SQL statements for querying table objects
    • Create a trigger
    • Change the log level for a PostgreSQL instance
    • Online DDL tools
    • Oracle supplemental logging
  • Upgrade Guide
    • Overview
    • Upgrade OMS in single-node deployment mode
    • Upgrade OMS in multi-node deployment mode
    • FAQ
  • FAQ
    • General O&M
      • How do I modify the resource quotas of an OMS container?
      • How do I troubleshoot the OMS server down issue?
      • Deploy InfluxDB for OMS
      • Increase the disk space of the OMS host
    • Task diagnostics
      • How do I troubleshoot common problems with Oracle Store?
      • How do I perform performance tuning for Oracle Store?
      • What do I do when Oracle Store reports an error at the isUpdatePK stack?
      • What do I do when a store does not have data of the timestamp requested by the downstream?
      • What do I do when OceanBase Store failed to access an OceanBase cluster through RPC?
      • How do I use LogMiner to pull data from an Oracle database?
    • OPS & monitoring
      • What are the alert rules?
    • Data synchronization
      • FAQ about synchronization to a message queue
        • What are the strategies for ensuring the message order in incremental data synchronization to Kafka
    • Data migration
      • User privileges
        • What privileges do I need to grant to a user during data migration to or from an Oracle database?
      • Full migration
        • How do I query the ID of a checker?
        • How do I query log files of the Checker component of OMS?
        • How do I query the verification result files of the Checker component of OMS?
        • What do I do if the destination table does not exist?
        • What can I do when the full migration failed due to LOB fields?
        • What do I do if garbled characters cannot be written into OceanBase Database V3.1.2?
      • Incremental synchronization
        • How do I skip DDL statements?
        • How do I migrate an Oracle database object whose name exceeds 30 bytes in length?
        • How do I update whitelists and blacklists?
        • What are the application scope and limits of ETL?
    • Installation and deployment
      • How do I upgrade Store?
  • Release Note
    • V4.2
      • OMS V4.2.4
      • OMS V4.2.3
      • OMS V4.2.2
      • OMS V4.2.1
      • OMS V4.2.0
    • V4.1
      • OMS V4.1.0
    • V4.0
      • OMS V4.0.2
      • OMS V4.0.1
    • V3.4
      • OMS V3.4.0
    • V3.3
      • OMS V3.3.1
      • OMS V3.3.0
    • V3.2
      • OMS V3.2.2
      • OMS V3.2.1
    • V3.1
      • OMS V3.1.0
    • V2.1
      • OMS V2.1.2
      • OMS V2.1.0

Download PDF

OMS Documentation What is OMS? Terms OMS HA Principles of Store Principles of Full-Import and Incr-Sync Data verification principles Overview Hierarchical functional system Basic components OMS Oracle full data migration design and impact Limitations Data migration process Data synchronization process Deployment types System and network requirements Memory and disk requirements Environment preparations Deploy OMS on a single node Deploy OMS on multiple nodes in a single region Deploy OMS on multiple nodes in multiple regions Integrate the OIDC protocol to OMS to implement SSO Scale out Check the deployment Deploy a time-series database (Optional) Log in to the OMS console Overview Configure user information Change your login password Log out Overview Migrate data from a MySQL database to a MySQL-compatible tenant of OceanBase Database Migrate data from a MySQL-compatible tenant of OceanBase Database to a MySQL database Migrate data from an Oracle database to a MySQL-compatible tenant of OceanBase Database Migrate data from an Oracle-compatible tenant of OceanBase Database to an Oracle database Migrate data from an Oracle database to an Oracle-compatible tenant of OceanBase Database Migrate data from a DB2 LUW database to an Oracle-compatible OceanBase database Migrate data from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database Migrate data from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database Migrate data from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database Migrate data within OceanBase Database Create an active-active disaster recovery task in OceanBase Database Migrate data from a TiDB database to a MySQL-compatible tenant of OceanBase Database Migrate data from a PostgreSQL database to a MySQL-compatible tenant of OceanBase Database Migrate data from a PolarDB-X 1.0 database to a MySQL-compatible tenant of OceanBase Database Migrate incremental data from an Oracle-compatible tenant of OceanBase Database to a MySQL database View details of a data migration task Rename a data migration task View and modify migration objects Use tags to Manage data migration tasks Perform batch operations on data migration tasks Download and import settings of migration objects View and modify the parameter configurations of a data migration task Start and pause a data migration task Release and delete a data migration task Synchronize DDL operations from a MySQL-compatible tenant of OceanBase Database to a MySQL database DDL synchronization from an Oracle-compatible tenant of OceanBase Database to an Oracle database Synchronize DDL operations from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database Synchronize DDL operations from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database Synchronize DDL operations from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database Synchronize DDL operations from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database DDL synchronization between MySQL-compatible tenants of OceanBase Database DDL synchronization between Oracle-compatible tenants of OceanBase Database Overview Synchronize data from OceanBase Database to a Kafka instance Synchronize data from OceanBase Database to a RocketMQ instance Synchronize data from OceanBase Database to a DataHub instance Synchronize data from an ODP logical table to a physical table in a MySQL-compatible tenant of OceanBase Database Synchronize data from an ODP logical table to a DataHub instance Synchronize data from an IDB logical table to a physical table in a MySQL-compatible tenant of OceanBase Database Synchronize data from an IDB logical table to a DataHub instance Synchronize data from a MySQL database to a DataHub instance Synchronize data from an Oracle database to a DataHub instance View details of a data synchronization task Change the name of a data synchronization task View and modify synchronization objects Use tags to Manage data synchronization tasks Perform batch operations on data synchronization tasks Download and import the settings of synchronization objects View and modify the parameter configurations of a data synchronization task Start and pause a data synchronization task Release and delete a data synchronization task Create a MySQL data source Create an Oracle data source Create a TiDB data source Create a Kafka data source Create a RocketMQ data source Create a DataHub data source Create a DB2 LUW data source Create a PostgreSQL data source Create a PolarDB-X 1.0 data source View data source information Copy a data source Edit a data source Delete a data source Create a database user User privileges Enable binlogs for the MySQL database Minimum privileges required when an Oracle database serves as the source O&M overview
OceanBase logo

The Unified Distributed Database for the AI Era.

Follow Us
Products
OceanBase CloudOceanBase EnterpriseOceanBase Community EditionOceanBase seekdb
Resources
DocsBlogLive DemosTraining & Certification
Company
About OceanBaseTrust CenterLegalPartnerContact Us
Follow Us

© OceanBase 2026. All rights reserved

Cloud Service AgreementPrivacy PolicySecurity
Contact Us
Document Feedback
  1. Documentation Center
  2. OceanBase Migration Service
  3. V4.2.4
iconOceanBase Migration Service
V 4.2.4Enterprise Edition
Enterprise Edition
  • V 4.3.2
  • V 4.3.1
  • V 4.3.0
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.0.2
  • V 3.4.0
Community Edition
  • V 4.2.13
  • V 4.2.12
  • V 4.2.11
  • V 4.2.10
  • V 4.2.9
  • V 4.2.8
  • V 4.2.7
  • V 4.2.6
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.2.1
  • V 4.2.0
  • V 4.0.0
  • V 3.3.1

Data formats

Last Updated:2026-04-14 07:36:49  Updated
share
What is on this page
Format description of serialization methods
Default JSON message format
JSON message format of Canal
DataWorks JSON message format
SharePlex JSON message format
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON message format
Avro JSON message format
Format description of data transmission to text protocol

folded

share

This topic describes the serialization methods and data formats for transmitting data from a database to a text protocol.

Format description of serialization methods

When you use OceanBase Migration Service (OMS) to synchronize data from a source to a destination, such as Kafka, DataHub (BLOB type), or RocketMQ, you can specify the serialization method to control the message format of the synchronized data. The supported serialization methods include Default, Canal, DataWorks (V2.0), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.

Note

  • Currently, only OceanBase Database in MySQL compatible mode supports the Debezium, DebeziumFlatten, and DebeziumSmt serialization methods.

  • Currently, only data from OceanBase Database in MySQL compatible mode can be synchronized to Kafka by using the Avro serialization method.

Default JSON message format

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the default JSON message format is used for serialization.

{
    "prevStruct": { // The previous image of the data.
        "col1": "val1" // A key-value pair containing all key-value pairs.
    },
    "postStruct": { // The current image of the data.
        "col1": "val1" // A key-value pair containing all key-value pairs.
    },
    "allMetaData"{
        "checkpoint": "STRING", // The synchronization point. In incremental synchronization, it represents the timestamp (in seconds) to which the data is synchronized. In full synchronization, it represents the primary key-value pairs.
        "record_primary_key": "STRING", // The name of the primary key column. If there are multiple columns, they are separated by \u0001.
        "record_primary_value": "STRING", // The value of the primary key. If there are multiple columns, they are separated by \u0001.
        "source_identity": "STRING", // The source identifier. If it is incremental, it is the subtopic. If it is full, it is an invalid sequence number.
        "dbType": "STRING", // The type of the database. Valid values: MYSQL, ORACLE, OCEANBASE (old mode, compatible), OB_IN_ORACLE_MODE (old mode, compatible), DB2 (old mode, compatible), OB_MYSQL, OB_ORACLE, and DB2_LUW.
        "storeDataSequence": "LONG", // This field exists only in incremental synchronization and is specified as sequenceEnabled=true in the source.json file. The default value is true. It is used for sorting. The generation rule is a timestamp plus an incrementing number of no more than five digits. The format is {timestamp}{incrementing number}.
        "table_name": "STRING", // The name of the table that is changed by using an SQL statement.
        "db": "STRING", // The name of the database that is changed by using an SQL statement. If the database is OceanBase Database, it contains the tenant name. The format is {tenant}.{database}.
        "timestamp": "STRING", // The timestamp of the data change in seconds. This field exists only in incremental synchronization.
        "uniqueId": "STRING", // The transaction sequence number identifier passed down by STORE in incremental synchronization.
        "transId": "STRING",  // In OceanBase Database V4.x and earlier, this field represents the transaction ID. inc is an auto-incrementing number, addr is the address of the coordinator, ts is the timestamp when the transaction ID is generated, and hash is the hash value of the preceding three values. If the transaction is incomplete, this field is null. For example, "transId":"{hash:614827533592190202, inc:1384316437, addr:\"xxx.xxx.xxx.xxx:2882\", t:1756190533588977}". In OceanBase Database V4.x and later, this field represents the tenant ID and transaction ID. For example, "transId":"1xxx_3152xxx".
        "clusterId": "STRING", // In OceanBase Database, this field represents the cluster ID. In MySQL, it represents the thread ID. In Oracle, this field does not exist.
        "ddlType": "STRING", // The type of DDL operations. This field is supported in OMS V4.0.1 and later.
    },
    "recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW" // The type of the change.
}
  • In the DDL record, only the key with "ddl" as the column name exists, and the value is the DDL statement.

  • The previous image and the current image:

    • prevStruct: The previous image of the incremental data, that is, the data before the SQL statement is executed.

    • postStruct: The current image of the incremental data, that is, the data after the SQL statement is executed.

    DELETE only exists in prevStruct, INSERT and DDL only exist in postStruct, UPDATE exists in both prevStruct and postStruct, and HEARTBEAT (heartbeat messages) does not exist in postStruct.

  • clusterId is described as follows:

    • In OceanBase Database, ob_org_cluster_id is used to set the CLUSTER_ID at the session level. The value is persisted to the transaction log. If you set ob_org_cluster_id when you write data, the value is used. Otherwise, the default value is used. This field exists only in incremental synchronization. For more information, see ob_org_cluster_id.

    • In MySQL, clusterId represents the thread ID in the MySQL Binlog Event. MySQL Server assigns an incrementing thread ID to each thread on a connection.

Here are some examples:

  • Example of INSERT data

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": null,
      "recordType": "INSERT",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    

    If recordType is ROW, the data is full data, and the format is the same as that of INSERT.

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": null,
      "recordType": "ROW",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    
  • Example of UPDATE data

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      },
      "recordType": "UPDATE",
      "postStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    
  • Example of DELETE data

    {
    "allMetaData":{
      "checkpoint": null,
      "record_primary_key": "int8\u0001int16",
      "source_identity": null,
      "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0",
      "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
      "clusterId": "123456",
      "record_primary_value": "3\u0001129",
      "dbType": "OB_MYSQL",
      "table_name": "table",
      "db": "tenant.database",
      "timestamp": "1609344671"
    },
    "prevStruct":{
      "col1": 3,
      "col2": 129,
      "col3": 2147483646,
      "col4": 9223372036854775806,
      "col5": 10223372036854775806,
      "col16": 1.2222,
      "col7": 9.99999999,
      "col8": "hello world",
      "col9": "aGVsbG8gd29ybGQ=",
      "col10": 9.999999999,
      "col11": "2020-11-25",
      "col12": "00:01:02",
      "col13": "2020-11-25 00:01:02",
      "col14": "1606233662.012345"
    },
    "recordType": "DELETE",
    "postStruct": null
    }
    
  • Example of DDL data

    ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
    "prevStruct": null,
    "postStruct": {
      "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
    },
    "allMetaData": {
      "checkpoint": "1671177057",
      "dbType": "OB_MYSQL",
      "storeDataSequence": null,
      "db": "connector_test",
      "timestamp": "1671177057",
      "uniqueId": null,
      "ddlType": "ALTER_TABLE",
      "record_primary_key": null,
      "source_identity": null,
      "record_primary_value": null,
      "table_name": "all_mysql_type_test"
    },
    "recordType": "DDL"
    }
    

JSON message format of Canal

When data is synchronized to Kafka, DataHub (BLOB type), and RocketMQ, Canal uses the following JSON message format for serialization.

{
    "database": "STRING", // The name of the database to which the SQL statement applies. For OceanBase Database, only the database name is provided, and the tenant name is not required.
    "sqlType": {
        "col1": "INTEGER", // The type of the column that is changed, which is a numeric value referencing java.sql.Types.
    },
    "data": [ // The key-value pairs of the data after the change. Currently, only one message exists.
        {
            "col1": "val1"
        }
    ],
    "pkNames": [ // The names of the primary key columns.
        "col1"
    ],
    "old": [ // This field exists only in UPDATE messages. It represents the column values before the change in the UPDATE statement.
        {
            "col1": "val1"
        }
    ],
    "mysqlType": { // The column type description.
        "col": "STRING"
    },
    "type": "STRING", // The type of the change.
    "table": "STRING", // The name of the table to which the SQL statement applies.
    "es": "LONG", // The change time, in milliseconds.
    "isDdl": "BOOLEAN", // Indicates whether the change is a DDL operation.
    "ts": "LONG", // The timestamp when the data is written to the destination.
    "sql": "STRING" // This field is empty.
}

Here are some examples:

  • INSERT (insert) data example

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.999999999999999093266253,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.9999999999999990,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"INSERT",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618323429026,
      "sql":""
    }
    
  • UPDATE (update) data example

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world 2020",
          "col3":1.2222,
          "col4":9.999999999999999093266253372484,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.9999999999999990932662,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":[
        {
          "string":"hello world"
        }
      ],
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"UPDATE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364572908,
      "sql":""
    }
    
  • DELETE (delete) data example

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.99999999999999909326625,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.99999999999999909326625,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "int8",
        "int16"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"DELETE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364660278,
      "sql":""
    }
    
  • DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "database": "connector_test",
        "sqlType": null,
        "data": null,
        "pkNames": null,
        "old": null,
        "mysqlType": null,
        "type": "ALTER",
        "table": "all_mysql_type_test",
        "es": 1671177209000,
        "isDdl": true,
        "ts": 1671177291475,
        "sql": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
    }
    

DataWorks JSON message format

DataWorks uses the following JSON message format for the serialization method for data synchronization to Kafka, DataHub (BLOB type), and RocketMQ:

{
 "version":"2.0", //The protocol version. Only DataWorks 2.0 is supported.
 "schema": { //Metadata information of changed columns. Only column names and column types are specified.
  "source": {//Change source information
  "dbType": "mysql", //Data source type
  "dbVersion": "5.7.35", //Database version
  "dbName": "myDatabase", //The name of the database.
  "schema": "mySchema", // The schema name. This parameter is required if a schema exists.
  "table": "tableName" // Table name
  },
  "column": [//The data column information about the modified data.
  {
  "name": "id",
  "type": "bigint"
  },
  {
  "name": "name",
  "type": "varchar(20)"
  },
  {
  "name": "mydata",
  "type": "binary"
  },
  {
 "name": "ts",
 "type": "datetime"
  }
  ],
 "pk": [//This field is required if the object has a primary key or unique key, otherwise, it can be omitted.
"pkName1",
"pkName2"
 ]
  },
   "payload": {
   "before": {
   "data":{
   "id": 111,
  "name":"scooter",
 "mydata": "[base64 string]", //Base64 encode if it is a binary type.
 "ts": 1590315269000.123456789 // The timestamp consists of 13 integer digits and 9 decimal digits.
  }
   },
   "after": {
   "data":{
   "id": 222,
  "name":"donald",
 "mydata": "[base64 string]",
 "ts": 1590315269000
 }
  },
 "op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",//case-sensitive
 "timestamp": {
 "eventTime": 1620457659000 // The time when the change occurred at the source database in 13-digit timestamps with millisecond precision.
 },
"ddl": {
"text": "ADD COLUMN ..."
 },
"scn": "Auto-incremented ID"
},
"extend": { //extend indicates fields for future expansion. You can leave the field empty if you do not have expansion requirements.
"load_fm":"CIBS" //The name of the source system. For example, "CIBS".
 }
}

Synchronization task heartbeat message:

{
    "version": "2.0", //Protocol version
    "payload": {
      "timestamp": {
        "eventTime": 1620457659000 // heartbeat timestamp
      },
      "op": "HEARTBEAT" // Indicates the heartbeat packet.
  }
}

Data samples are as follows:

  • Example of INSERT (insert) data

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName12"
            ]
        },
        "payload":{
            "before":null,
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000008125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.000000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"INSERT",
            "timestamp":{
                "eventTime":1647581000000,
                "systemTime":1647581000795,
                "checkpointTime":1647581000
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • Here is an example of how to update data:

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.000000000000000125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.00000000008125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world 2020",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"UPDATE",
            "timestamp":{
                "eventTime":1647581038000,
                "systemTime":1647581038674,
                "checkpointTime":1647581038
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • Here is an example of how to delete data:

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":null,
            "op":"DELETE",
            "timestamp":{
                "eventTime":1647581072000,
                "systemTime":1647581072976,
                "checkpointTime":1647581072
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • A DDL example:

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "version": "2.0",
        "schema": {
            "source": {
                "dbType": "ob_mysql",
                "dbVersion": null,
                "dbName": "connector_test",
                "schema": null,
                "table": "all_mysql_type_test"
            },
            "column": null,
            "pk": null
        },
        "payload": {
            "before": null,
            "after": null,
            "op": "ALTER",
            "timestamp": {
                "eventTime": 1671177209000,
                "systemTime": 1671177291485,
                "checkpointTime": 1671177200
            },
            "ddl": {
                "text": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
            },
            "scn": "null"
        },
        "extend": {}
    }
    

SharePlex JSON message format

When you synchronize data to Kafka, DataHub (BLOB type), and RocketMQ, SharePlex uses the following JSON message format.

{
    "data": { // The key-value pairs of the changed data. For INSERT and DELETE operations, the values are the full values. For UPDATE operations, the values are the changed values.
        "col1": "val1"
    },
    "meta": {
        "time": "YYYY-MM-DDTHH:mm:ss", // The time when the change occurred.
        "op": "", // The type of change, including ins, upd, del, and ddl.
        "posttime": "YYYY-MM-DDTHH:mm:ss", // The time when the data was written to the destination.
        "idx": "STRING", // The index of the message in the transaction or the number of indexed messages. This parameter is deprecated.
        "size": "NUMBER", // The number of messages in the transaction. This parameter is deprecated.
        "seq": "STRING", // The sequence number. This parameter is valid only when transactionEnabled is enabled on the source.
        "table": "STRING", // The name of the SQL database table {database}.{table}.
        "rowid": "STRING", // The name of the changed database table and the primary key value separated by \u0001.
        "trans": "STRING", // The transaction ID.
        "scn": "STRING" // This field exists only in incremental scenarios when the source.json configuration file contains sequenceEnabled=true. By default, sequenceEnabled is true. This field is used for sorting. The generation rule is a timestamp plus an incrementing sequence number of up to five digits in a synchronization process.
    },
    "key": { // This field exists only for UPDATE operations. It represents the values before the change.
    },
    "sql": {
      "ddl": ""
      } // This field exists only for DDL operations. If op is ddl, the DDL statement is written to this field.
}

Here are some examples of data:

  • Example of INSERT (insert) data

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.999999999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:22:00",
        "op":"ins",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • Example of UPDATE (update) data

    {
      "data":{
        "string":"hello world 2020"
      },
      "meta":{
        "posttime":"2020-12-07T13:59:09",
        "op":"upd",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      },
      "key":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.9999999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      }
    }
    
  • Example of DELETE (delete) data

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.9999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:34:10",
        "op":"del",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • Example of DDL data

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "data": {},
        "meta": {
            "posttime": "2022-12-16T15:54:51",
            "op": "ddl",
            "size": 0,
            "time": "2022-12-16T15:53:29",
            "idx": "0/0",
            "seq": 0,
            "table": "connector_test.all_mysql_type_test",
            "rowid": "connector_test.all_mysql_type_test-",
            "trans": null,
            "scn": "null"
        },
        "sql": {
            "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" comment 'test'"
        }
    }
    

DefaultExtendColumnType JSON message format

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the DefaultExtendColumnType serialization method uses the following JSON message format.

The DefaultExtendColumnType JSON message format adds a field __light_type to the DEFAULT format. This field is used to indicate the data type of the field.

{
    "prevStruct": { // The image before the change.
    },
    "postStruct": { // The image after the change.
        "__light_type": {
            "col": { // The name of the field.
                "schemaType": "type" // The type of the value.
            }
        }
    },
    "allMetaData": {

    }
}

Here are some examples of data:

  • INSERT (insert) data

    {
    "allMetaData":{
      "checkpoint": null,
      "record_primary_key": "int8\u0001int16",
      "source_identity": null,
      "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
      "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
      "clusterId": "123456",
      "record_primary_value": "3\u0001129",
      "dbType": "OB_MYSQL",
      "table_name": "table",
      "db": "tenant.database",
      "timestamp": "1609344671"
    },
      "prevStruct": null,
      "recordType": "INSERT",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.99999999999999909326625337248,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999999909326625337248461995470488734032045693707225049338,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long":{
            "schemaType":"TIMESTAMP"
          }
        }
      }
    }
    

    When recordType is ROW, it indicates that the data is fully transmitted, and the format is the same as INSERT.

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": null,
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": null
      },
      "prevStruct": null,
      "recordType": "ROW",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
            "timestamp_in_long":{
            "schemaType":"TIMESTAMP"
          }
          }
          }
          }
    
  • UPDATE (update) data

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:1103909342127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType": "UPDATE",
      "postStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      }
    }
    
  • DELETE (delete) data

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:1103xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType":"DELETE",
      "postStruct":null
    }
    
  • DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "prevStruct": null,
        "postStruct": {
            "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'",
            "__light_type": {
                "ddl": {
                    "schemaType": "VAR_STRING"
                }
            }
        },
        "allMetaData": {
            "checkpoint": "1671177200",
            "dbType": "OB_MYSQL",
            "storeDataSequence": null,
            "db": "connector_test",
            "timestamp": "1671177209",
            "uniqueId": null,
            "ddlType": "ALTER_TABLE",
            "record_primary_key": null,
            "source_identity": null,
            "record_primary_value": null,
            "table_name": "all_mysql_type_test"
        },
        "recordType": "DDL"
    }
    

Debezium JSON message format

When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), and RocketMQ, Debezium uses the following JSON message format. The format contains two types. Usually, only the payload structure is displayed.

  • The schema and payload fields exist.

    {
        "schema": { //The structure that describes the payload fields. This structure is not displayed by default.
            "type": "struct", //The struct field indicates that the field contains a structure.
            "optional": false, //Indicates whether the field is required.
            "fields": [
                {
                    "type": "int64", //The type of the field.
                    "optional": false, //Indicates whether the field is required.
                    "field": "ts_ms" //The name of the field.
                }
            ]
        },
        "payload": {
            "op": "c", //The data modification type. Valid values: c (full or insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
            "source": {
                "version": "", //The version of OMS.
                "connector": "OB_MYSQL", //The data source type.
                "name": "OMS", //The fixed value OMS.
                "ts_ms": 0, //The second-level timestamp of data modification. This field exists only in incremental data.
                "db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is displayed, and the tenant name is not displayed.
                "table": "testTab", //The name of the table that is modified by using an SQL statement.
                "pos": "553132@1668496109" //The position in the binlog file. The value is in the [binlog file name]@[binlog file name offset] format.
            },
            "before": { //The before image.
                "column": "value" //A key-value pair that contains all key-value pairs.
            },
            "after": { //The after image.
                "column": "value" //A key-value pair that contains all key-value pairs.
            },
            "ts_ms": 1668497367188 //The timestamp of data processing.
        }
    }
    
  • Only the payload field exists.

    {
        "payload": {
            "op": "c", //The data modification type. Valid values: c (full or insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
            "source": {
                "version": "", //The version of OMS.
                "connector": "OB_MYSQL", //The data source type.
                "name": "OMS", //The fixed value OMS.
                "ts_ms": 0, //The second-level timestamp of data modification. This field exists only in incremental data.
                "db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is displayed, and the tenant name is not displayed.
                "table": "testTab", //The name of the table that is modified by using an SQL statement.
                "pos": "553132@16684****" //The position in the binlog file. The value is in the [binlog file name]@[binlog file name offset] format.
            },
            "before": { //The before image.
                "column": "value" //A key-value pair that contains all key-value pairs.
            },
            "after": { //The after image.
                "column": "value" //A key-value pair that contains all key-value pairs.
            },
            "ts_ms": 1668497367188 //The timestamp of data processing.
        }
    }
    

Here are some examples:

  • Example of INSERT (insert) data

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"c",
                "source":{
                "connector":"OB_MYSQL",
                "pos":"703223@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668491621000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":2,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495423594
              }
              }
    
  • Example of UPDATE (update) data

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"u",
                "before":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"6900000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "source":{
                "connector":"OB_MYSQL",
                "pos":"436999@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668495861000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495906356
              }
              }
    
  • Example of DELETE (delete) data

    {
        "schema":{
            "optional":false,
            "type":"STRUCT",
            "fields":[
                {
                    "field":"before",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"after",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"source",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"version",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"connector",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"name",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"ts_ms",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"db",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"table",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"server_id",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"pos",
                            "optional":false,
                            "type":"string"
                        }
                    ]
                },
                {
                    "field":"op",
                    "optional":false,
                    "type":"string"
                },
                {
                    "field":"ts_ms",
                    "optional":false,
                    "type":"int64"
                }
            ]
        },
        "payload":{
            "op":"d",
            "before":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
            },
            "source":{
                "connector":"OB_MYSQL",
                "pos":"553132@1668****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668496109000,
                "db":"test",
                "table":"table_name"
            },
            "ts_ms":1668496119717
        }
    }
    

DebeziumFlatten JSON message format

When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, the JSON message format for the DebeziumFlatten serialization method is as follows. Compared with the Debezium serialization method, the schema and payload fields are no longer filled.

{
    "op": "c", //The data modification type. Valid values: c (full data or insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
    "source": {
        "version": "", //The version of OMS.
        "connector": "OB_MYSQL", //The type of the data source.
        "name": "OMS", //The fixed value OMS.
        "ts_ms": 0, //The second-level timestamp of the data change. This field is present only in incremental data.
        "db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is present, and the tenant name is not required.
        "table": "testTab", //The name of the table that is modified by using an SQL statement.
        "pos": "553132@16684****" //The position in the binlog file [binlog file name]@[binlog file name offset].
    },
    "before": { //The image before the change.
        "column": "value" //The key-value pair that contains the full key-value.
    },
    "after": { //The image after the change.
        "column": "value" //The key-value pair that contains the full key-value.
    },
    "ts_ms": 1668497367188 //The timestamp of the data processing.
}

Here are some data examples:

  • Example of INSERT (insert) data

    {
       "op":"c",
       "source":{
           "connector":"OB_MYSQL",
           "pos":"703223@166849****",
           "name":"OMS",
           "version":"",
           "ts_ms":1668491621000,
           "db":"test",
           "table":"table_name"
           },
       "after":{
           "c11":"a",
           "c10":2.4212412,
           "c13":"c",
           "c12":"b",
           "c15":"65",
           "c14":"d",
           "c17":"67",
           "c16":"f",
           "c19":"69000000000000000000",
           "c18":"68",
           "c20":"6A",
           "c22":19311,
           "c21":"2022-11-15T05:12:11Z",
           "c02":"12312",
           "c24":1668489131000,
           "c01":2,
           "c23":36060000000,
           "c04":"61",
           "c26":"6B",
           "c03":"1241.41000",
           "c25":2022,
           "c06":141,
           "c05":11,
           "c08":412124124,
           "c07":4241,
           "c09":2.11111
           },
           "ts_ms":1668495423594
           }
    
  • Example of UPDATE (update) data

    {
        "op":"u",
        "before":{
            "c11":"a",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"690000000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"436999@166849****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668495861000,
            "db":"test",
            "table":"table_name"
            },
        "after":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"69000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
            "ts_ms":1668495906356
            }
    
  • Example of DELETE (delete) data

    {
        "op":"d",
        "before":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"69000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"553132@1668****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668496109000,
            "db":"test",
            "table":"table_name"
        },
        "ts_ms":1668496119717
    }
    

DebeziumSmt JSON message format

DebeziumSmt is a configuration method provided by Debezium. It uses the Single Message Transform (SMT) to convert and process a single message. When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, the JSON message format for the DebeziumSmt serialization method only displays the key:value in the after field.

For example, when you use the Debezium serialization method to update data:

{
    "op": "u",
    "source": {
        "connector": "OB_MYSQL",
        "name": "OMS"
    },
    "ts_ms": 1668496119717,
    "before": {
        "field1": "before_value1",
        "field2": "before_value2"
    },
    "after": {
        "field1": "after_value1",
        "field2": "after_value2"
    }
}

After the SMT processes the above message, the message format is simplified. Therefore, the JSON message format for the DebeziumSmt serialization method is as follows:

{
    "field1": "after_value1",
    "field2": "after_value2"
}

Here are some data examples:

  • Example of INSERT (insert) data

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "false"
    }
    
  • Example of UPDATE (update) data

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "false"
    }
    
  • Example of DELETE (delete) data

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "true"
    }
    

Avro JSON message format

When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, the Avro serialization method uses the following JSON message format.

  • Full migration

    {
        "version": 1,
        "id": 0,
        "sourceTimestamp": 1702371565, // The safe point timestamp.
        "sourcePosition": "", // Full migration does not contain position information.
        "safeSourcePosition": "",
        "sourceTxid": "",
        "source": {
            "sourceType": "MySQL", // Fixed value.
            "version": "OBMySQL" // Fixed value.
        },
        "operation": "INIT", // Full migration is of the INIT type.
        "objectName": "test***",
        "processTimestamps": [
            1702371565238
        ], // Only the delivery time.
        "tags": {
            "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Only the primary key.
        },
        "fields": [
            {
                "name": "id",
                "dataTypeNumber": 246
            }, // The type of each column.
            {
                "name": "bid",
                "dataTypeNumber": 3
            },
            {
                "name": "name",
                "dataTypeNumber": 15
            },
            {
                "name": "address",
                "dataTypeNumber": 254
            }
        ],
        "beforeImages": null, // The before image of full migration is empty.
        "afterImages": [ // The after image. For an INTEGER type, the precision is 8. For a FLOAT type, the precision is 8 and the scale is 64.
            {
                "value": "1",
                "precision": 1,
                "scale": 0
            },
            {
                "precision": 8,
                "value": "11"
            },
            {
                "charset": "utf8mb4",
                "value": {
                    "bytes": "yyy"
                }
            },
            null
        ]
    }
    
  • Incremental synchronization of DML operations

    • INSERT (insert) operation

      {
        "version": 1,
        "id": 170236922143600000,
        "sourceTimestamp": 1702369092,
        "sourcePosition": "1702369080",  // The checkpoint of OceanBase Database in MySQL compatible mode.
        "safeSourcePosition": "1702369080", // The checkpoint of OceanBase Database in MySQL compatible mode.
        "sourceTxid": "",
        "source": {
          "sourceType": "MySQL",
          "version": "OBMySQL"
        },
        "operation": "INSERT",
        "objectName": "test***",
        "processTimestamps": [1702369221480],
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}"
        },
        "fields": [
          {"name": "id", "dataTypeNumber": 8},
          {"name": "bid", "dataTypeNumber": 3},
          {"name": "name", "dataTypeNumber": 15}
        ],
        "beforeImages": null,                // The before image of an INSERT operation is empty.
        "afterImages": [
          {"precision": 8, "value": "2"},
          {"precision": 8, "value": "12"},
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}
          }
        ]
      }
      
    • UPDATE (update) operation

      {
        "version": 1,
        "id": 170236975822100001,
        "sourceTimestamp": 1702369757,
        "sourcePosition": "1702369756",
        "safeSourcePosition": "1702369756",
        "sourceTxid": "",
        "source": {
          "sourceType": "MySQL",
          "version": "OBMySQL"
        },
        "operation": "UPDATE",
        "objectName": "test***",
        "processTimestamps": [1702369758237],
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}"
        },
        "fields": [
          {"name": "id", "dataTypeNumber": 8},
          {"name": "bid", "dataTypeNumber": 3},
          {"name": "name", "dataTypeNumber": 15}
        ],
        "beforeImages": [   // The before image and after image of an UPDATE operation exist.
          {"precision": 8, "value": "3"},
          {"precision": 8, "value": "22"},
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ],
        "afterImages": [
          {"precision": 8, "value": "3"},
          {"precision": 8, "value": "44"},
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ]
      }
      
    • DELETE (delete) operation

      {
        "version": 1,
        "id": 170236976527500000,
        "sourceTimestamp": 1702369764,
        "sourcePosition": "1702369763",
        "safeSourcePosition": "1702369763",
        "sourceTxid": "",
        "source": {
          "sourceType": "MySQL",
          "version": "OBMySQL"
        },
        "operation": "DELETE",
        "objectName": "test***",
        "processTimestamps": [1702369765287],
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}"
        },
        "fields": [
          {"name": "id", "dataTypeNumber": 8},
          {"name": "bid", "dataTypeNumber": 3},
          {"name": "name", "dataTypeNumber": 15}
        ],
        "beforeImages": [
          {"precision": 8, "value": "3"},
          {"precision": 8, "value": "44"},
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ],
        "afterImages": null // The after image of a DELETE operation is empty.
      }
      
  • Incremental synchronization of DDL operations

    {
      "version": 1,
      "id": 170236979372400000,
      "sourceTimestamp": 1702369793,
      "sourcePosition": "1702369792",
      "safeSourcePosition": "1702369792",
      "sourceTxid": "",
      "source": {
        "sourceType": "MySQL",
        "version": "OBMySQL"
      },
      "operation": "DDL",
      "objectName": "test***",
      "processTimestamps": [1702369794543],
      "tags": {},
      "fields": null,    // Incremental synchronization of DDL operations does not contain fields and beforeImages.
      "beforeImages": null,
      "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The afterImages of a STRING type is the DDL statement.
    }
    

Format description of data transmission to text protocol

When you synchronize data from OceanBase Database to Kafka, DataHub (BLOB type), or RocketMQ

  • If the serialization method is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType, the mapping of OceanBase Database data is as follows.

    • OceanBase Database in MySQL compatible mode

      Data type Mapping type Description
      TINYINT
      SMALLINT
      MEDIUMINT
      INT
      INTEGER
      YEAR
      BOOL
      BOOLEAN
      Long An integer with a bit length of less than 64.
      Normal values, such as 1000, are not in scientific notation.
      For BOOL/BOOLEAN, true = 1 and false = 0.
      DECIMAL
      NUMERIC
      BigDecimal An exact decimal value or an integer with a bit length of more than 64. For integers, decimal points and decimals are not displayed. For values with decimals, the number of digits is displayed based on the data received from the database. Trailing zeros are not removed, and scientific notation is used.
      FLOAT
      DOUBLE
      Double A floating-point number.
      The number of significant digits is determined based on whether the source data is of the FLOAT or DOUBLE type. FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.
      CHAR
      VARCHAR
      TINYTEXT
      TEXT
      MEDIUMTEXT
      LONGTEXT
      ENUM
      SET
      String A string.
      TINYBLOB
      BLOB
      MEDIUMBLOB
      LONGBLOB
      BINARY
      VARBINARY
      BIT
      Bytes A byte array, which is displayed in BASE64 by default.
      Note For BIT types, the leading zeros in the byte array are removed after incremental data is received, but they are not removed for full data. Therefore, the BASE64 encoding may vary. However, the actual results are consistent, and the decoding results are consistent.
      DATE Date A date in the YYYY-MM-DD format. If the time is invalid, the original string is displayed.
      TIME Time A time in the HH:mm:ss[.nnnnnnnnn] format.
      Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.
      DATETIME DateTime A date and time, including the time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
      Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.
      TIMESTAMP Timestamp A timestamp in the [second-level timestamp][.nnnnnnnnn] format.
      Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, it is displayed in the 0000-00-00 00:00:00 format.
    • OceanBase Database in Oracle compatible mode

      Data type Mapping type Description
      INTEGER Long An integer with a bit length of less than 64.
      Normal values, such as 1000, are not in scientific notation.
      NUMBER
      FLOAT
      BigDecimal An exact decimal value or an integer with a bit length of more than 64.
      BINARY_FLOAT BINARY_DOUBLE Double A floating-point number.
      The number of significant digits is determined based on whether the source data is of the FLOAT or DOUBLE type. FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.
      VARCHAR2
      NVARCHAR2
      INTERVAL YEAR TO MOTH
      INTERVAL DAY TO SECOND
      CLOB
      NCLOB
      ROWID
      UROWID
      String A string.
      BLOB
      BFILE
      RAW
      Bytes A byte array.
      Displayed in BASE64 by default.
      DATE
      TIMESTAMP
      TIMESTAMP WITH TIME ZONE
      TIMESTAMP WITH LOCAL TIME ZONE
      DateTime A date and time, including the time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
      Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.
  • If the serialization method is Debezium, the mapping of OceanBase Database in MySQL compatible mode is as follows.

    Notice

    When you synchronize data from OceanBase Database in Oracle compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, you cannot select the Debezium serialization method.

    Data type Mapping type Description
    BOOLEAN
    BOOL
    BOOLEAN The value can be true or false.
    TINYINT
    SMALLINT
    MEDIUMINT
    INT/INTEGER
    BIGINT
    YEAR
    LONG An integer in the range of -2^63^ to 2^63^ .
    BIGINT STRING The data is displayed in the string format.
    FLOAT
    DOUBLE
    DOUBLE A floating-point number.
    DECIMAL
    NUMERIC
    STRING The data is displayed in the string format. For values with decimals, the number of digits is displayed based on the data received from the database. Trailing zeros are not removed, and scientific notation is used.
    BIT
    BINARY
    VARBINARY
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES A byte array encoded in base16.
    CHAR
    VARCHAR
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    ENUM
    SET
    STRING A string.
    TIMESTAMP STRING The format is YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z, and the time zone is UTC.
    DATE LONG The number of days elapsed since January 1, 1970.
    TIME LONG The number of microseconds elapsed since 00:00:00, without time zone information.
    DATETIME LONG The number of milliseconds elapsed since January 1, 1970, 00:00:00, without time zone information.
  • If the serialization method is Avro, the mapping of OceanBase Database in MySQL compatible mode is as follows.

    Notice

    You can select the Avro serialization method only when you synchronize data from OceanBase Database in MySQL compatible mode to Kafka.

    Type name Mapping type
    TINYINT
    BOOLEAN
    SMALLINT
    MEDIUMINT
    INT
    BIGINT
    BIT
    INTEGER
    FLOAT
    DOUBLE
    FLOAT
    DECIMAL
    NUMERIC
    DECIMAL
    VARCHAR
    CHAR
    TINYTEXT
    MEDIUMTEXT
    LONGTEXT
    TEXT
    CHARACTER
    BINARY
    VARBINARY
    TINYBLOB
    MEDIUMBLOB
    LONGBLOB
    BLOB
    BinaryObject
    TIMESTAMP TimestampObject
    Note For TIMESTAMP data, both full and incremental data are converted to timestamps. Invalid timestamps are converted to -9223372022400L.
    Except for invalid timestamps, you can use the Java Instant.ofEpochSecond(ts, nanos) method to obtain the correct wall time.
    DATE
    TIME
    DATETIME
    YEAR
    DATETIME
    JSON
    ENUM
    SET
    TextObject
    GEOMETRY TextGeometry
    Note The data transmission uses the EWKT format, so the data is mapped to the TextGeometry type.

Previous topic

Column filtering
Last

Next topic

Overview
Next
What is on this page
Format description of serialization methods
Default JSON message format
JSON message format of Canal
DataWorks JSON message format
SharePlex JSON message format
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON message format
Avro JSON message format
Format description of data transmission to text protocol