OceanBase logo

OceanBase

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Resources

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS

OceanBase Cloud

OceanBase Database

Tools

Connectors and Middleware

QUICK START

OceanBase Cloud

OceanBase Database

BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Company

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

International - English
中国站 - 简体中文
日本 - 日本語
Sign In
Start on Cloud

A unified distributed database ready for your transactional, analytical, and AI workloads.

DEPLOY YOUR WAY

OceanBase Cloud

The best way to deploy and scale OceanBase

OceanBase Enterprise

Run and manage OceanBase on your infra

TRY OPEN SOURCE

OceanBase Community Edition

The free, open-source distributed database

OceanBase seekdb

Open source AI native search database

Customer Stories

Real-world success stories from enterprises across diverse industries.

View All
BY USE CASES

Mission-Critical Transactions

Global & Multicloud Application

Elastic Scaling for Peak Traffic

Real-time Analytics

Active Geo-redundancy

Database Consolidation

Comprehensive knowledge hub for OceanBase.

Blog

Live Demos

Training & Certification

Documentation

Official technical guides, tutorials, API references, and manuals for all OceanBase products.

View All
PRODUCTS
OceanBase CloudOceanBase Database
ToolsConnectors and Middleware
QUICK START
OceanBase CloudOceanBase Database
BEST PRACTICES

Practical guides for utilizing OceanBase more effectively and conveniently

Learn more about OceanBase – our company, partnerships, and trust and security initiatives.

About OceanBase

Partner

Trust Center

Contact Us

Start on Cloud
编组
All Products
    • Databases
    • iconOceanBase Database
    • iconOceanBase Cloud
    • iconOceanBase Tugraph
    • iconInteractive Tutorials
    • iconOceanBase Best Practices
    • Tools
    • iconOceanBase Cloud Platform
    • iconOceanBase Migration Service
    • iconOceanBase Developer Center
    • iconOceanBase Migration Assessment
    • iconOceanBase Admin Tool
    • iconOceanBase Loader and Dumper
    • iconOceanBase Deployer
    • iconKubernetes operator for OceanBase
    • iconOceanBase Diagnostic Tool
    • iconOceanBase Binlog Service
    • Connectors and Middleware
    • iconOceanBase Database Proxy
    • iconEmbedded SQL in C for OceanBase
    • iconOceanBase Call Interface
    • iconOceanBase Connector/C
    • iconOceanBase Connector/J
    • iconOceanBase Connector/ODBC
    • iconOceanBase Connector/NET
icon

OceanBase Migration Service

V4.3.1Enterprise Edition

  • OMS Documentation
  • OMS Introduction
    • Overview of OMS
    • Terms
    • OMS HA
    • Principles of Store
    • Principles of Full-Import and Incr-Sync
    • Data verification principles
    • Architecture
      • Overview
      • Hierarchical functional system
      • Basic components
    • OMS Oracle full migration design and impact
    • Limitations
  • Quick Start
    • Data migration process
    • Data synchronization process
  • Deploy OMS
    • Deployment types
    • System and network requirements
    • Memory and disk requirements
    • Environment preparations
    • Deploy OMS on a single node
    • Deploy OMS on multiple nodes in a single region
    • Deploy OMS on multiple nodes in multiple regions
    • Scale out
    • Scale down deployment
    • Check the deployment
    • Deploy a time-series database (Optional)
  • OMS console
    • Log in to the OMS console
    • Overview
    • User center
      • Configure user information
      • Change your login password
      • Log out
  • Data migration
    • Overview
    • Migrate data from a MySQL database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a MySQL-compatible tenant of OceanBase Database to a MySQL database
    • Migrate data from an Oracle database to the MySQL compatible mode of OceanBase Database
    • Migrate data from the Oracle compatible mode of OceanBase Database to an Oracle database
    • Migrate data from an Oracle database to the Oracle compatible mode of OceanBase Database
    • Migrate data from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database
    • Migrate data from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database
    • Migrate data from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database
    • Migrate data between OceanBase databases of the same tenant type
    • Configure a bidirectional synchronization task
    • Migrate data from a TiDB database to a MySQL-compatible tenant of OceanBase Database
    • Migrate data from a PostgreSQL database to the Oracle compatible mode of OceanBase Database
    • Migrate data from a PostgreSQL database to the MySQL compatible mode of OceanBase Database
    • Migrate data from a PolarDB-X 1.0 database to a MySQL-compatible tenant of OceanBase Database
    • Migrate incremental data from an Oracle-compatible tenant of OceanBase Database to a MySQL database
    • Manage data migration tasks
      • View details of a data migration task
      • Rename a data migration task
      • View and modify migration objects
      • Use tags to Manage data migration tasks
      • Perform batch operations on data migration tasks
      • Download and import settings of migration objects
      • View and modify migration parameters
      • Download a conflict log file
      • Start and pause a data migration task
      • End and delete a data migration task
    • Supported DDL operations and limits for synchronization
      • Synchronize DDL operations from a MySQL database to a MySQL-compatible tenant of OceanBase Database
        • Overview of DDL synchronization from MySQL to OceanBase Database's MySQL compatible mode
        • CREATE TABLE
          • Create a table
          • Create a column
          • Create indexes or constraints
          • Create partitions
        • Data type conversion
        • ALTER TABLE
          • Modify tables
          • Operations on columns
          • Operations on constraints and indexes
          • Partition operations
        • TRUNCATE TABLE
        • RENAME TABLE
        • DROP TABLE
        • CREATE INDEX
        • DROP INDEX
        • DDL incompatibilities between a MySQL database and a MySQL-compatible tenant of OceanBase Database
          • Overview
          • Incompatibilities of the CREATE TABLE statement
            • Incompatibilities of CREATE TABLE
            • Column types that are supported to create indexes or constraints
          • Incompatibilities of the ALTER TABLE statement
            • Incompatibilities of ALTER TABLE
            • Change the type of a constrained column
            • Change the type of an unconstrained column
            • Change the length of a constrained column
            • Change the length of an unconstrained column
            • Delete a constrained column
          • Incompatibilities of DROP INDEX operations
      • Synchronize DDL operations from the MySQL compatible mode of OceanBase Database to a MySQL database
      • DDL operations for synchronizing data from an Oracle database to an Oracle-compatible tenant of OceanBase Database
        • Overview of DDL synchronization from Oracle to OceanBase Database Oracle compatible mode
        • CREATE TABLE
          • Overview for CREATE TABLE
          • Create a relational table
            • Create a relational table
            • Define columns of a relational table
          • Virtual columns
          • Regular columns
          • Create partitions
            • Overview for creating partitions
            • Partitioning
            • Subpartitioning
            • Composite partitioning
            • User-defined partitioning
            • Subpartition templates
          • Constraints
            • Overview
            • Inline constraints
            • Out-of-line constraints
        • CREATE INDEX
          • Overview
          • Oracle compatible mode
        • ALTER TABLE
          • Overview
          • Modify, drop, and add table attributes
          • Column attribute management
            • Modify, drop, and add column attributes
            • Rename a column
            • Add columns and column attributes
            • Modify column attributes
            • Drop columns
          • Modify, drop, and add constraints
          • Partition management
            • Modify, drop, and add partitions
            • Drop a partition
            • Drop a subpartition
            • Add partitions and subpartitions
            • Modify partitions
            • Drop partition data
        • DROP TABLE
        • RENAME OBJECT
        • TRUNCATE TABLE
        • DROP INDEX
        • DDL incompatibilities between an Oracle database and an Oracle-compatible tenant of OceanBase Database
          • Overview
          • Incompatibilities of CREATE TABLE
          • Incompatibilities in table modification operations
            • Incompatibilities of ALTER TABLE
            • Change the type of a constrained column
            • Change the type of an unconstrained column
            • Change the length of a constrained column
            • Change the length of an unconstrained column
      • Synchronize DDL operations from the Oracle compatible mode of OceanBase Database to an Oracle database
      • Synchronize DDL operations from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database
      • Synchronize DDL operations from the Oracle compatible mode of OceanBase Database to a DB2 LUW database
      • Synchronize DDL operations from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database
      • Synchronize DDL operations from the MySQL compatible mode of OceanBase Database to a DB2 LUW database
      • Synchronize DDL operations between MySQL-compatible tenants of OceanBase Database
      • DDL synchronization between Oracle-compatible tenants of OceanBase Database
      • DDL operations for synchronizing data from a PostgreSQL database to the MySQL compatible mode of OceanBase Database
      • DDL synchronization from PostgreSQL to OceanBase Database in Oracle compatible mode
  • Data synchronization
    • Overview
    • Synchronize data from OceanBase Database to a Kafka instance
    • Synchronize data from OceanBase Database to a RocketMQ instance
    • Synchronize data from OceanBase Database to a DataHub instance
    • Synchronize data from an ODP logical table to a physical table in a MySQL-compatible tenant of OceanBase Database
    • Synchronize data from an ODP logical table to a DataHub instance
    • Synchronize data from an IDB logical table to a MySQL-compatible tenant of OceanBase Database
    • Synchronize data from an IDB logical table to a DataHub instance
    • Synchronize data from a MySQL database to a DataHub instance
    • Synchronize data from an Oracle database to a DataHub instance
    • Manage data synchronization tasks
      • View details of a data synchronization task
      • Change the name of a data synchronization task
      • View and modify synchronization objects
      • Use tags to Manage data synchronization tasks
      • Perform batch operations on data synchronization tasks
      • Download and import the settings of synchronization objects
      • View and modify the parameter configurations of a data synchronization task
      • Start and pause a data synchronization task
      • End and delete a data synchronization task
  • Data validation
    • Overview
    • Create a data validation task
    • Manage data validation tasks
      • View details of a data validation task
      • Change the name of a data validation task
      • View and modify validation objects
      • View and modify validation parameters
      • Manage data validation tasks by using tags
      • Import validation objects
      • Start, stop, and resume a data validation task
      • Clone a data validation task
      • Delete a data validation task
  • Create and manage data sources
    • Create data sources
      • Create an OceanBase data source
        • Create a physical OceanBase data source
        • Create an ODP data source
        • Create an IDB data source
        • Create a public cloud OceanBase data source
        • Create a standalone OceanBase data source
      • Create a MySQL data source
      • Create an Oracle data source
      • Create a TiDB data source
      • Create a Kafka data source
      • Create a RocketMQ data source
      • Create a DataHub data source
      • Create a DB2 LUW data source
      • Create a PostgreSQL data source
      • Create a PolarDB-X 1.0 data source
    • Manage data sources
      • View data source information
      • Copy a data source
      • Edit a data source
      • Delete a data source
    • Create a database user
    • User privileges
    • Enable binlogs for the MySQL database
    • Minimum privileges required when an Oracle database serves as the source
  • OPS & Monitoring
    • O&M overview
    • Go to the overview page
    • Server
      • View server information
      • Update the quota
      • View server logs
      • Manage resource groups
    • Components
      • Store
        • Add a Store component
        • View details of a Store component
        • Update the configurations of a Store component
        • Start and pause a Store component
        • Delete a Store component
      • Incr-Sync
        • View details of an Incr-Sync component
        • Start and pause an Incr-Sync component
        • Migrate an Incr-Sync component
        • Update the configurations of an Incr-Sync component
        • Batch O&M
        • Delete an Incr-Sync component
      • Full-Import
        • View details of a Full-Import component
        • Pause a Full-Import component
        • Rerun and resume a Full-Import component
        • Update the configurations of a Full-Import component
        • Delete a Full-Import component
      • Full-Verification
        • View details of a Full-Verification component
        • Pause a Full-Verification component
        • Rerun and resume a Full-Verification component
        • Update the configurations of a Full-Verification component
      • Incr-Verification
        • View details of the Incr-Verification component
        • Pause an Incr-Verification component
        • Rerun and resume an Incr-Verification component
        • Update an Incr-Verification component
      • Row-Verification
        • View details of a Row-Verification component
    • O&M Task
      • View O&M tasks
      • Skip a task or subtask
      • Retry a task or subtask
    • Parameter Template
      • Overview
      • Task Template
        • Create a task template
        • View and edit task templates
        • Copy and export a task template
        • Delete a task template
      • Component Template
        • Create a component template
        • View and edit component templates
        • Copy and export a component template
        • Delete a component template
      • Component parameters
        • Store component parameters
        • Incr-Sync component parameters
        • Full-Import component parameters
        • Full-Verification component parameters
        • Incr-Verification component parameters
        • Parameters of the Row-Verification component
        • CM component parameters
        • Supervisor component parameters
  • System management
    • Permission Management
      • Overview
      • Manage users
      • Manage departments
    • Alert center
      • View task alerts
      • View system alerts
      • Manage alert settings
    • Associate with OCP
    • System parameters
      • Modify system parameters
      • Modify HA configurations
      • oblogproxy parameters
    • Manage access keys
    • Operation audit
  • Troubleshooting Guide
    • Manage OMS services
    • OMS logs
    • Component O&M
      • O&M operations for the Supervisor component
      • CLI-based O&M for the Connector component
      • O&M operations for the Store component
    • Component tuning
      • Incr-Sync/Full-Import tuning
      • Oracle store tuning
    • Set throttling
    • Store performance diagnostics
  • Reference Guide
    • Features
      • Configure DDL/DML synchronization
      • DDL synchronization scope
      • Rename databases and tables
      • Use SQL conditions to filter data
      • Set the incremental synchronization start timestamp
      • Configure matching rules for migration or synchronization objects
      • Configure matching rules for validation objects
      • Wildcard patterns supported for matching rules
      • Hidden column mechanisms
      • Instructions on schema migration
      • Create and update a heartbeat table
      • Change a topic
      • Column filtering
      • Data formats
    • API Reference
      • Overview
      • CreateProject
      • StartProject
      • StopProject
      • ResumeProject
      • ReleaseProject
      • DeleteProject
      • ListProjects
      • DescribeProject
      • DescribeProjectSteps
      • DescribeProjectStepMetric
      • DescribeProjectProgress
      • DescribeProjectComponents
      • ListProjectFullVerifyResult
      • StartProjectsByLabel
      • StopProjectsByLabel
      • CreateMysqlDataSource
      • CreateOceanBaseDataSource
      • CreateOceanBaseODPDataSource
      • CreatePolarDBDataSource
      • ListDataSource
      • CreateLabel
      • ListAllLabels
      • DeleteDataSource
      • CreateProjectModifyRecords
      • ListProjectModifyRecords
      • StopProjectModifyRecords
      • RetryProjectModifyRecords
      • CancelProjectModifyRecord
      • SubmitPreCheck
      • GetPreCheckResult
      • UpdateProjectConfig
      • Download schema conversion information
      • DownloadConflictData
      • ListConflictData
      • ResetIncrStartTimestamp
      • AdvanceProject
      • DescribeRegions
    • Alert Reference
      • oms_host_down
      • oms_host_down_migrate_resource
      • oms_host_threshold
      • oms_migration_failed
      • oms_migration_delay
      • oms_sync_failed
      • oms_sync_status_inconsistent
      • oms_sync_delay
    • SSO
      • Integrate the OIDC protocol to OMS to implement SSO
      • Integrate the SAML 2.0 protocol to OMS to implement SSO
      • Access Microsoft Entra ID using OMS SAML 2.0
    • OMS error codes
    • SQL statements for querying table objects
    • Create a trigger
    • Change the log level for a PostgreSQL database instance
    • Online DDL tools
    • Supplemental logging in Oracle databases
  • Upgrade Guide
    • Overview
    • Upgrade OMS in single-node deployment mode
    • Upgrade OMS in multi-node deployment mode
    • FAQ
  • FAQ
    • General O&M
      • How do I modify the resource quotas of an OMS container?
      • How do I troubleshoot the OMS server down issue?
      • Deploy InfluxDB for OMS
      • Increase the disk space of the OMS host
    • Task diagnostics
      • How do I troubleshoot common problems with Oracle Store?
      • How do I perform performance tuning for Oracle Store?
      • What do I do when Oracle Store reports an error at the isUpdatePK stack?
      • What do I do when a store does not have data of the timestamp requested by the downstream?
      • What do I do when OceanBase Store failed to access an OceanBase cluster through RPC?
      • How do I use LogMiner to pull data from an Oracle database?
    • OPS & monitoring
      • What are the alert rules?
    • Data synchronization
      • FAQ about synchronization to a message queue
        • What are the strategies for ensuring the message order in incremental data synchronization to Kafka
    • Data migration
      • User privileges
        • What privileges do I need to grant to a user during data migration to or from an Oracle database?
      • Full migration
        • How do I query the ID of a checker?
        • How do I query log files of the Checker component of OMS?
        • How do I query the verification result files of the Checker component of OMS?
        • What do I do if the target table does not exist?
        • What can I do when the full migration failed due to LOB fields?
        • What do I do if garbled characters cannot be written into OceanBase Database V3.1.2?
      • Incremental synchronization
        • How do I skip DDL statements?
        • How do I migrate an Oracle database object whose name exceeds 30 bytes in length?
        • How do I update whitelists and blacklists?
        • What are the application scope and limits of ETL?
    • Installation and deployment
      • How do I upgrade Store?
  • Release Note
    • Release Note
      • Version number rules
      • V4.3
        • OMS V4.3.1
        • OMS V4.3.0
      • V4.2
        • OMS V4.2.5
        • OMS V4.2.4
        • OMS V4.2.3
        • OMS V4.2.2
        • OMS V4.2.1
        • OMS V4.2.0
      • V4.1
        • OMS V4.1.0
      • V4.0
        • OMS V4.0.2
        • OMS V4.0.1
      • V3.4
        • OMS V3.4.0
      • V3.3
        • OMS V3.3.1
        • OMS V3.3.0
      • V3.2
        • OMS V3.2.2
        • OMS V3.2.1
      • V3.1
        • OMS V3.1.0
      • V2.1
        • OMS V2.1.2
        • OMS V2.1.0

Download PDF

OMS Documentation Overview of OMS Terms OMS HA Principles of Store Principles of Full-Import and Incr-Sync Data verification principles Overview Hierarchical functional system Basic components OMS Oracle full migration design and impact Limitations Data migration process Data synchronization process Deployment types System and network requirements Memory and disk requirements Environment preparations Deploy OMS on a single node Deploy OMS on multiple nodes in a single region Deploy OMS on multiple nodes in multiple regions Scale out Scale down deployment Check the deployment Deploy a time-series database (Optional) Log in to the OMS console Overview Configure user information Change your login password Log out Overview Migrate data from a MySQL database to a MySQL-compatible tenant of OceanBase Database Migrate data from a MySQL-compatible tenant of OceanBase Database to a MySQL database Migrate data from an Oracle database to the MySQL compatible mode of OceanBase Database Migrate data from the Oracle compatible mode of OceanBase Database to an Oracle database Migrate data from an Oracle database to the Oracle compatible mode of OceanBase Database Migrate data from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database Migrate data from an Oracle-compatible tenant of OceanBase Database to a DB2 LUW database Migrate data from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database Migrate data from a MySQL-compatible tenant of OceanBase Database to a DB2 LUW database Migrate data between OceanBase databases of the same tenant type Configure a bidirectional synchronization task Migrate data from a TiDB database to a MySQL-compatible tenant of OceanBase Database Migrate data from a PostgreSQL database to the Oracle compatible mode of OceanBase Database Migrate data from a PostgreSQL database to the MySQL compatible mode of OceanBase Database Migrate data from a PolarDB-X 1.0 database to a MySQL-compatible tenant of OceanBase Database Migrate incremental data from an Oracle-compatible tenant of OceanBase Database to a MySQL database View details of a data migration task Rename a data migration task View and modify migration objects Use tags to Manage data migration tasks Perform batch operations on data migration tasks Download and import settings of migration objects View and modify migration parameters Download a conflict log file Start and pause a data migration task End and delete a data migration task Synchronize DDL operations from the MySQL compatible mode of OceanBase Database to a MySQL database Synchronize DDL operations from the Oracle compatible mode of OceanBase Database to an Oracle database Synchronize DDL operations from a DB2 LUW database to an Oracle-compatible tenant of OceanBase Database Synchronize DDL operations from the Oracle compatible mode of OceanBase Database to a DB2 LUW database Synchronize DDL operations from a DB2 LUW database to a MySQL-compatible tenant of OceanBase Database Synchronize DDL operations from the MySQL compatible mode of OceanBase Database to a DB2 LUW database Synchronize DDL operations between MySQL-compatible tenants of OceanBase Database DDL synchronization between Oracle-compatible tenants of OceanBase Database DDL operations for synchronizing data from a PostgreSQL database to the MySQL compatible mode of OceanBase Database DDL synchronization from PostgreSQL to OceanBase Database in Oracle compatible mode Overview Synchronize data from OceanBase Database to a Kafka instance Synchronize data from OceanBase Database to a RocketMQ instance Synchronize data from OceanBase Database to a DataHub instance Synchronize data from an ODP logical table to a physical table in a MySQL-compatible tenant of OceanBase Database Synchronize data from an ODP logical table to a DataHub instance Synchronize data from an IDB logical table to a MySQL-compatible tenant of OceanBase Database Synchronize data from an IDB logical table to a DataHub instance Synchronize data from a MySQL database to a DataHub instance Synchronize data from an Oracle database to a DataHub instance View details of a data synchronization task Change the name of a data synchronization task View and modify synchronization objects Use tags to Manage data synchronization tasks Perform batch operations on data synchronization tasks Download and import the settings of synchronization objects View and modify the parameter configurations of a data synchronization task Start and pause a data synchronization task End and delete a data synchronization task Overview Create a data validation task View details of a data validation task Change the name of a data validation task View and modify validation objects View and modify validation parameters Manage data validation tasks by using tags Import validation objects Start, stop, and resume a data validation task Clone a data validation task Delete a data validation task Create a MySQL data source Create an Oracle data source Create a TiDB data source
OceanBase logo

The Unified Distributed Database for the AI Era.

Follow Us
Products
OceanBase CloudOceanBase EnterpriseOceanBase Community EditionOceanBase seekdb
Resources
DocsBlogLive DemosTraining & Certification
Company
About OceanBaseTrust CenterLegalPartnerContact Us
Follow Us

© OceanBase 2026. All rights reserved

Cloud Service AgreementPrivacy PolicySecurity
Contact Us
Document Feedback
  1. Documentation Center
  2. OceanBase Migration Service
  3. V4.3.1
iconOceanBase Migration Service
V 4.3.1Enterprise Edition
Enterprise Edition
  • V 4.3.2
  • V 4.3.1
  • V 4.3.0
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.0.2
  • V 3.4.0
Community Edition
  • V 4.2.12
  • V 4.2.11
  • V 4.2.10
  • V 4.2.9
  • V 4.2.8
  • V 4.2.7
  • V 4.2.6
  • V 4.2.5
  • V 4.2.4
  • V 4.2.3
  • V 4.2.1
  • V 4.2.0
  • V 4.0.0
  • V 3.3.1

Data formats

Last Updated:2025-10-09 03:34:24  Updated
share
What is on this page
Serialization formats
Default JSON message format
JSON format of Canal messages
JSON message format of DataWorks
JSON format of SharePlex messages
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON format
Avro JSON message format
Format of data transmitted from OceanBase Database to text protocols

folded

share

This topic describes the data formats of serialized data and data transmitted from a database to the text protocol.

Serialization formats

When you use OceanBase Migration Service (OMS) to synchronize data from a source database to a Kafka, DataHub (BLOB type), or RocketMQ topic, you can control the serialization format of the synchronized data by specifying the serialization format. The available serialization formats are Default, Canal, DataWorks (V2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.

Note

  • Currently, only MySQL compatible mode of OceanBase Database support the Debezium, DebeziumFlatten, and DebeziumSmt serialization formats.

  • Currently, the Avro serialization format is supported only for data synchronization from a MySQL compatible mode of OceanBase Database to a Kafka topic.

Default JSON message format

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the default serialization format is JSON and the JSON message contains the following fields.

{
    "prevStruct": { // The pre-image of the changed data.
        "col1": "val1" // Key-value pairs, including all keys and values.
    },
    "postStruct": { // The post-image of the changed data.
        "col1": "val1" // Key-value pairs, including all keys and values.
    },
    "allMetaData"{
        "checkpoint": "STRING", // The current synchronization point. In incremental synchronization, it represents the timestamp (in seconds) to which the data is synchronized. In full data synchronization, it represents the primary key value pairs.
        "record_primary_key": "STRING", // The name of the primary key column. If there are multiple columns, they are separated with \u0001.
        "record_primary_value": "STRING", // The primary key value. If there are multiple columns, they are separated with \u0001.
        "source_identity": "STRING", // The identifier of the source. It is the subtopic in incremental synchronization and an irrelevant sequence number in full data synchronization.
        "dbType": "STRING", // The type of the database. It can be MYSQL, Oracle, OCEANBASE (legacy mode, compatible use), OB_IN_ORACLE_MODE (legacy mode, compatible use), DB2 (legacy mode, compatible use), OB_MYSQL, OB_ORACLE, or DB2 LUW.
        "storeDataSequence": "LONG", // The sequence number for sorting. By default, it is set to true in the source.json configuration file for incremental synchronization. It is not applicable for full data synchronization. The sequence number is generated based on the following rule: timestamp + incremental number not exceeding five digits. It is in the format of {timestamp}{incremental number}.
        "table_name": "STRING", // The name of the table on which the SQL change is performed.
        "db": "STRING", // The name of the database on which the SQL change is performed. For OceanBase Database, the tenant name and database name are included, in the format of {tenant}.{database}.
        "timestamp": "STRING", // The timestamp (in seconds) of data change. It is applicable only to incremental synchronization.
        "uniqueId": "STRING", // The transaction sequence number transmitted by the store, which is used to identify incremental synchronization.
        "transId": "STRING",  // In OceanBase Database of a version earlier than V4.x, it indicates the transaction ID (inc is an auto-increment number, addr is the address of the coordinator, and ts is the time when the transaction ID is generated. hash is the hash value of the preceding three). If the transaction is incomplete, it is null. For example, "transId":"{hash:614827533592190202, inc:1384316437, addr:\"xxx.xxx.xxx.xxx:2882\", t:1756190533588977}". In OceanBase Database V4.x and later, it indicates ${tenant ID}_${transaction ID}. For example, "transId":"1xxx_3152xxx".
        "clusterId": "STRING", // In OceanBase Database, it represents the cluster ID. In MySQL Database, it represents the thread ID. It is not applicable in Oracle Database.
        "ddlType": "STRING", // The DDL specific type. It is supported from OMS V4.0.1.
    },
    "recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW" // The change type.
}
  • In a DDL message, the value of the "ddl" field, which is the only field in the message, is the DDL statement.

  • The following describes the prevStruct and postStruct fields:

    • prevStruct: the pre-image of the changed data, namely, the data before SQL execution.

    • postStruct: the post-image of the changed data, namely, the data after SQL execution.

    A DELETE message contains only the prevStruct field. An INSERT or DDL message contains only the postStruct field. An UPDATE message contains both the prevStruct and postStruct fields. A HEARTBEAT (periodic heartbeat) message does not contain the postStruct or prevStruct field.

  • The clusterId field is described as follows:

    • In OceanBase Database, the ob_org_cluster_id field is used to set the session-level CLUSTER_ID of OceanBase Database. The value is persisted in the transaction log. If you set the ob_org_cluster_id field when you write data, the set value takes precedence. Otherwise, the default value is used. The clusterId field is applicable only to incremental synchronization. For more information, see ob_org_cluster_id.

    • In MySQL Database, the clusterId field indicates the thread ID in the MySQL Binlog Event. The MySQL server assigns an increasing thread ID to each thread on the connection.

Here are some data examples:

  • Example of data inserted

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": null,
      "recordType": "INSERT",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    

    When recordType is ROW, the data is full data and the format is the same as for INSERT data.

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "source_identity": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": null,
      "recordType": "ROW",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    
  • Example of data updated

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      },
      "recordType": "UPDATE",
      "postStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
      }
    }
    
  • Example of data deleted

    {
    "allMetaData":{
      "checkpoint": null,
      "record_primary_key": "int8\u0001int16",
      "source_identity": null,
      "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0",
      "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
      "clusterId": "123456",
      "record_primary_value": "3\u0001129",
      "dbType": "OB_MYSQL",
      "table_name": "table",
      "db": "tenant.database",
      "timestamp": "1609344671"
    },
    "prevStruct":{
      "col1": 3,
      "col2": 129,
      "col3": 2147483646,
      "col4": 9223372036854775806,
      "col5": 10223372036854775806,
      "col16": 1.2222,
      "col7": 9.99999999,
      "col8": "hello world",
      "col9": "aGVsbG8gd29ybGQ=",
      "col10": 9.999999999,
      "col11": "2020-11-25",
      "col12": "00:01:02",
      "col13": "2020-11-25 00:01:02",
      "col14": "1606233662.012345"
    },
    "recordType": "DELETE",
    "postStruct": null
    }
    
  • DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
    "prevStruct": null,
    "postStruct": {
      "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
    },
    "allMetaData": {
      "checkpoint": "1671177057",
      "dbType": "OB_MYSQL",
      "storeDataSequence": null,
      "db": "connector_test",
      "timestamp": "1671177057",
      "uniqueId": null,
      "ddlType": "ALTER_TABLE",
      "record_primary_key": null,
      "source_identity": null,
      "record_primary_value": null,
      "table_name": "all_mysql_type_test"
    },
    "recordType": "DDL"
    }
    

JSON format of Canal messages

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, Canal uses the following JSON message format for serialization.

{
    "database": "STRING", // The name of the database where the change occurred. In OceanBase Database, only the database name is provided without the tenant name.
    "sqlType": {
        "col1": "INTEGER", // The types of changed columns. The reference standard is java.sql.Types.
    },
    "data": [ // The key-value pairs of the changed data. Currently, only one message is generated.
        {
            "col1": "val1"
        }
    ],
    "pkNames": [ // The names of primary key columns
        "col1"
    ],
    "old": [ // Present only in UPDATE messages. The names of columns changed by the UPDATE statement, namely, the values of columns before the change.
        {
            "col1": "val1"
        }
    ],
    "mysqlType": { // The descriptions of column types.
        "col": "STRING"
    },
    "type": "STRING", // The change type.
    "table": "STRING", // The name of the table where the change occurred.
    "es": "LONG", // The timestamp of the change in milliseconds.
    "isDdl": "BOOLEAN", // Whether the message is generated from a DDL event.
    "ts": "LONG", // The timestamp when the message is written to the target.
    "sql": "STRING" // Currently empty.
}

Here are some examples:

  • An example of an INSERT message

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.999999999999999093266253,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.9999999999999990,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"INSERT",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618323429026,
      "sql":""
    }
    
  • An example of an UPDATE message

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world 2020",
          "col3":1.2222,
          "col4":9.999999999999999093266253372484,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.9999999999999990932662,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "col1",
        "col2"
      ],
      "old":[
        {
          "string":"hello world"
        }
      ],
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"UPDATE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364572908,
      "sql":""
    }
    
  • An example of a DELETE message

    {
      "database":"database",
      "sqlType":{
        "col1":93,
        "col2":12,
        "col3":6,
        "col4":8,
        "col5":5,
        "col6":92,
        "col7":4,
        "col8":-5,
        "col9":2004,
        "col10":-6,
        "col11":91,
        "col12":3,
        "col13":-5,
        "col14":93
      },
      "data":[
        {
          "col1":"2020-11-25 00:01:02",
          "col2":"hello world",
          "col3":1.2222,
          "col4":9.99999999999999909326625,
          "col5":129,
          "col6":"00:01:02",
          "col7":2147483646,
          "col8":9223372036854775806,
          "col9":"aGVsbG8gd29ybGQ=",
          "col10":3,
          "col11":"2020-11-25",
          "col12":9.99999999999999909326625,
          "col13":10223372036854775806,
          "col14":"1606233662.012345"
        }
      ],
      "pkNames":[
        "int8",
        "int16"
      ],
      "old":null,
      "mysqlType":{
        "col1":"datetime",
        "col2":"varchar",
        "col3":"float",
        "col4":"double",
        "col5":"smallint",
        "col6":"time",
        "col7":"int",
        "col8":"int64",
        "col9":"blob",
        "col10":"tinyint",
        "col11":"date",
        "col12":"decimal",
        "col13":"bigint",
        "col14":"timestamp"
      },
      "type":"DELETE",
      "table":"table",
      "es":1609344671000,
      "isDdl":false,
      "ts":1618364660278,
      "sql":""
    }
    
  • A DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "database": "connector_test",
        "sqlType": null,
        "data": null,
        "pkNames": null,
        "old": null,
        "mysqlType": null,
        "type": "ALTER",
        "table": "all_mysql_type_test",
        "es": 1671177209000,
        "isDdl": true,
        "ts": 1671177291475,
        "sql": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
    }
    

JSON message format of DataWorks

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, DataWorks serializes the data using the following JSON message format.

{
 "version":"2.0", //The protocol version. Currently, only DataWorks 2.0 is supported.
 "schema": { //The metadata of changes. Only the column names and types are specified.
 "source": {//The source of changes.
  "dbType": "mysql", //The data source type.
  "dbVersion": "5.7.35", //The database version.
  "dbName": "myDatabase", //The database name.
  "schema": "mySchema", //The schema name. It is required in a database system that supports schemas.
  "table": "tableName" //The table name.
 },
 "column": [//The changed data columns. The records of the target table are updated based on the changed data.
  {
  "name": "id",
  "type": "bigint"
  },
  {
  "name": "name",
  "type": "varchar(20)"
  },
  {
  "name": "mydata",
  "type": "binary"
  },
  {
  "name": "ts",
  "type": "datetime"
  }
 ],
 "pk": [//The primary keys or unique keys are required if any. Otherwise, this field can be left empty.
  "pkName1",
  "pkName2"
 ]
},
"payload": {
 "before": {
  "data":{
   "id": 111,
   "name":"scooter",
   "mydata": "[base64 string]", //If the data type is binary, the data is Base64-encoded.
   "ts": 1590315269000.123456789 //The timestamp. The integer part is 13 digits long and the fractional part is 9 digits long.
  }
 },
 "after": {
  "data":{
   "id": 222,
   "name":"donald",
   "mydata": "[base64 string]",
   "ts": 1590315269000
  }
 },
 "op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",//Case-sensitive
 "timestamp": {
  "eventTime": 1620457659000 // Timestamp (13-digit, millisecond precision) when change occurred in source database.
 },
 "ddl": {
  "text": "ADD COLUMN ..."
 },
 "scn": "An auto-increment ID"
},
"extend": { //The extend field is reserved for future use. If there is no future use, this field can be ignored.
 "load_fm":"CIBS" //The source system that records originate from. For example, "CIBS"
 }
}

A heartbeat message is sent as follows:

{
    "version": "2.0", //The protocol version.
    "payload": {
        "timestamp": {
            "eventTime": 1620457659000 //The timestamp when the heartbeat message is sent.
        },
        "op": "HEARTBEAT" //Identifies the heartbeat message.
    }
}

Here are some examples:

  • An INSERT operation

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName12"
            ]
        },
        "payload":{
            "before":null,
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000000008125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.000000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"INSERT",
            "timestamp":{
                "eventTime":1647581000000,
                "systemTime":1647581000795,
                "checkpointTime":1647581000
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • An UPDATE operation

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.000000000000000125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.00000000008125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world 2020",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "op":"UPDATE",
            "timestamp":{
                "eventTime":1647581038000,
                "systemTime":1647581038674,
                "checkpointTime":1647581038
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • A DELETE operation

    {
        "version":"2.0",
        "schema":{
            "source":{
                "dbType":"ob_mysql",
                "dbVersion":null,
                "dbName":"db",
                "schema":null,
                "table":"tab"
            },
            "column":[
                {
                    "name":"int8",
                    "type":"TINYINT"
                },
                {
                    "name":"int16",
                    "type":"SMALLINT"
                },
                {
                    "name":"int32",
                    "type":"INT"
                },
                {
                    "name":"int64",
                    "type":"INT64"
                },
                {
                    "name":"float32",
                    "type":"FLOAT"
                },
                {
                    "name":"float64",
                    "type":"DOUBLE"
                },
                {
                    "name":"bigInt",
                    "type":"BIGINT"
                },
                {
                    "name":"boolean",
                    "type":"BOOLEAN"
                },
                {
                    "name":"string",
                    "type":"VARCHAR"
                },
                {
                    "name":"bytes",
                    "type":"BLOB"
                },
                {
                    "name":"decimal",
                    "type":"DECIMAL"
                },
                {
                    "name":"localDate",
                    "type":"DATE"
                },
                {
                    "name":"localTime",
                    "type":"TIME"
                },
                {
                    "name":"localDateTime",
                    "type":"DATETIME"
                },
                {
                    "name":"timestamp",
                    "type":"TIMESTAMP"
                },
                {
                    "name":"zonedDateTime",
                    "type":"ZONED_DATETIME"
                },
                {
                    "name":"intervalDayToSecond",
                    "type":"INTERVAL_DAY_TO_SECOND"
                },
                {
                    "name":"intervalYearToMonth",
                    "type":"INTERVAL_YEAR_TO_MONTH"
                }
            ],
            "pk":[
                "pkName1",
                "pkName2"
            ]
        },
        "payload":{
            "before":{
                "data":{
                    "col1":3,
                    "col2":129,
                    "col3":2147483646,
                    "col4":9223372036854775806,
                    "col5":1.2222,
                    "col6":0.0000000000125,
                    "col7":10223372036854775806,
                    "col8":1,
                    "col9":"hello world",
                    "col10":"aGVsbG8gd29ybGQ=",
                    "col11":0.0000000000125,
                    "col12":"2020-11-25",
                    "col13":"00:01:02",
                    "col14":"2020-11-25 00:01:02",
                    "col15":"1606233662.012345",
                    "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai",
                    "col17":"INTERVAL '3' DAY",
                    "col18":"INTERVAL '4' YEAR"
                }
            },
            "after":null,
            "op":"DELETE",
            "timestamp":{
                "eventTime":1647581072000,
                "systemTime":1647581072976,
                "checkpointTime":1647581072
            },
            "ddl":null,
            "scn":"null"
        },
        "extend":{
            "load_fm": "test"
        }
    }
    
  • A DDL operation

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "version": "2.0",
        "schema": {
            "source": {
                "dbType": "ob_mysql",
                "dbVersion": null,
                "dbName": "connector_test",
                "schema": null,
                "table": "all_mysql_type_test"
            },
            "column": null,
            "pk": null
        },
        "payload": {
            "before": null,
            "after": null,
            "op": "ALTER",
            "timestamp": {
                "eventTime": 1671177209000,
                "systemTime": 1671177291485,
                "checkpointTime": 1671177200
            },
            "ddl": {
                "text": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'"
            },
            "scn": "null"
        },
        "extend": {}
    }
    

JSON format of SharePlex messages

When data is synchronized to Kafka, DataHub (for BLOB data), or RocketMQ, SharePlex uses the following JSON message format for serialization.

{
    "data": { // The key-value pairs of changed data. For INSERT / DELETE operations, all values are included. For UPDATE operations, only changed values are included.
        "col1": "val1"
    },
    "meta": {
        "time": "YYYY-MM-DDTHH:mm:ss", // The time when the change occurred.
        "op": "", // The change type. Valid values: ins, upd, del, and ddl.
        "posttime": "YYYY-MM-DDTHH:mm:ss", // The time when the data was written to the target.
        "idx": "STRING", //The index of the message in the transaction or the total number of messages in the transaction. This parameter is deprecated.
        "size": "NUMBER", //The number of messages in the transaction. This parameter is deprecated.
        "seq": "STRING", // The sequence number. This parameter is returned only when transactionEnabled is set to true in the source.
        "table": "STRING", // The name of the changed table in the SQL database. The format is {database}.{table}.
        "rowid": "STRING", // {source_table_name}-{primary_key_values_separated_by_\u0001}
        "trans": "STRING", // The transaction ID.
        "scn": "STRING" // This parameter is returned only in incremental synchronization. It is included in the source.json configuration file with sequenceEnabled set to true. The default value is true. It is used for sorting. The generation rule is that the timestamp + an incremental number not exceeding five digits are returned.
    },
    "key": { // This parameter is returned only for UPDATE operations. It indicates the values of the data before the change.
    },
    "sql": {
      "ddl": ""
      } // This parameter is returned only for DDL operations. If op = ddl, the DDL statement is written.
}

Here are some data examples:

  • Example of data inserted (INSERT operation)

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.999999999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:22:00",
        "op":"ins",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • Example of data updated (UPDATE operation)

    {
      "data":{
        "string":"hello world 2020"
      },
      "meta":{
        "posttime":"2020-12-07T13:59:09",
        "op":"upd",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      },
      "key":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.9999999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      }
    }
    
  • Example of data deleted (DELETE operation)

    {
      "data":{
        "col1":"2020-11-25 00:01:02",
        "col2":"hello world",
        "col3":"INTERVAL '3' DAY",
        "col4":1.2222,
        "col5":9.9999999999308,
        "col6":129,
        "col7":"00:01:02",
        "col8":1,
        "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai",
        "col10":2147483646,
        "col11":9223372036854775806,
        "col12":"aGVsbG8gd29ybGQ=",
        "col13":"INTERVAL '4' YEAR",
        "col14":3,
        "col15":"2020-11-25",
        "col16":9.9999999308,
        "col17":10223372036854775806,
        "col18":"1606233662.012345"
      },
      "meta":{
        "posttime":"2020-12-07T13:34:10",
        "op":"del",
        "size":10,
        "time":"2020-11-25T00:01:02",
        "idx":"1/10",
        "seq":1,
        "table":"mock_database.mock_table",
        "rowid":"mock_database.mock_table-3\u0001129",
        "trans":"shareplex_transaction_id",
        "scn":"123456789"
      }
    }
    
  • DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "data": {},
        "meta": {
            "posttime": "2022-12-16T15:54:51",
            "op": "ddl",
            "size": 0,
            "time": "2022-12-16T15:53:29",
            "idx": "0/0",
            "seq": 0,
            "table": "connector_test.all_mysql_type_test",
            "rowid": "connector_test.all_mysql_type_test-",
            "trans": null,
            "scn": "null"
        },
        "sql": {
            "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" comment 'test'"
        }
    }
    

DefaultExtendColumnType JSON message format

When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the DefaultExtendColumnType serialization format uses the following JSON message format.

The DefaultExtendColumnType JSON message format adds a field __light_type in the image to indicate the data type of the field, based on the DEFAULT format.

{
    "prevStruct": { // The image before the change
    },
    "postStruct": { // The image after the change
        "__light_type": {
            "col": { // The name of the field
                "schemaType": "type" // The type of the value
            }
        }
    },
    "allMetaData": {

    }
}

Here is an example:

  • Example of INSERT (insert) data

    {
    "allMetaData":{
      "checkpoint": null,
      "record_primary_key": "int8\u0001int16",
      "source_identity": null,
      "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0",
      "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
      "clusterId": "123456",
      "record_primary_value": "3\u0001129",
      "dbType": "OB_MYSQL",
      "table_name": "table",
      "db": "tenant.database",
      "timestamp": "1609344671"
    },
      "prevStruct": null,
      "recordType": "INSERT",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.99999999999999909326625337248,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999999909326625337248461995470488734032045693707225049338,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long":{
            "schemaType":"TIMESTAMP"
          }
        }
      }
    }
    

    When recordType is ROW, it indicates that the data is fully transmitted. The data format is the same as that of INSERT.

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": null,
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": null,
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": null
      },
      "prevStruct": null,
      "recordType": "ROW",
      "postStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999,
        "col8": "hello world",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.99999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
            "timestamp_in_long":{
            "schemaType":"TIMESTAMP"
          }
          }
          }
          }
    
  • Example of UPDATE (update) data

    {
      "allMetaData": {
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:1103909342127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType":"OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType": "UPDATE",
      "postStruct": {
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type": {
          "int8": {
            "schemaType": "TINYINT"
          },
          "int16": {
            "schemaType": "SMALLINT"
          },
          "int32": {
            "schemaType": "INT"
          },
          "int64": {
            "schemaType": "INT64"
          },
          "bigInt": {
            "schemaType": "BIGINT"
          },
          "float32": {
            "schemaType": "FLOAT"
          },
          "float64": {
            "schemaType": "DOUBLE"
          },
          "string": {
            "schemaType": "VARCHAR"
          },
          "bytes": {
            "schemaType": "BLOB"
          },
          "decimal": {
            "schemaType": "DECIMAL"
          },
          "localDate": {
            "schemaType": "DATE"
          },
          "localTime": {
            "schemaType": "TIME"
          },
          "localDateTime": {
            "schemaType": "DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      }
    }
    
  • Example of DELETE (delete) data

    {
      "allMetaData":{
        "checkpoint": null,
        "record_primary_key": "int8\u0001int16",
        "source_identity": null,
        "uniqueId": "{tid:1103xxxx127, partition_id:0, part_cnt:0},5917,391,0",
        "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}".
        "clusterId": "123456",
        "record_primary_value": "3\u0001129",
        "dbType": "OB_MYSQL",
        "table_name": "table",
        "db": "tenant.database",
        "timestamp": "1609344671"
      },
      "prevStruct":{
        "col1": 3,
        "col2": 129,
        "col3": 2147483646,
        "col4": 9223372036854775806,
        "col5": 10223372036854775806,
        "col6": 1.2222,
        "col7": 9.999999999999,
        "col8": "hello world 2020",
        "col9": "aGVsbG8gd29ybGQ=",
        "col10": 9.999999999999,
        "col11": "2020-11-25",
        "col12": "00:01:02",
        "col13": "2020-11-25 00:01:02",
        "col14": "1606233662.012345",
        "__light_type":{
          "int8":{
            "schemaType":"TINYINT"
          },
          "int16":{
            "schemaType":"SMALLINT"
          },
          "int32":{
            "schemaType":"INT"
          },
          "int64":{
            "schemaType":"INT64"
          },
          "bigInt":{
            "schemaType":"BIGINT"
          },
          "float32":{
            "schemaType":"FLOAT"
          },
          "float64":{
            "schemaType":"DOUBLE"
          },
          "string":{
            "schemaType":"VARCHAR"
          },
          "bytes":{
            "schemaType":"BLOB"
          },
          "decimal":{
            "schemaType":"DECIMAL"
          },
          "localDate":{
            "schemaType":"DATE"
          },
          "localTime":{
            "schemaType":"TIME"
          },
          "localDateTime":{
            "schemaType":"DATETIME"
          },
          "timestamp_in_long": {
            "schemaType": "TIMESTAMP"
          }
        }
      },
      "recordType":"DELETE",
      "postStruct":null
    }
    
  • DDL example

    ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';
    
    {
        "prevStruct": null,
        "postStruct": {
            "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'",
            "__light_type": {
                "ddl": {
                    "schemaType": "VAR_STRING"
                }
            }
        },
        "allMetaData": {
            "checkpoint": "1671177200",
            "dbType": "OB_MYSQL",
            "storeDataSequence": null,
            "db": "connector_test",
            "timestamp": "1671177209",
            "uniqueId": null,
            "ddlType": "ALTER_TABLE",
            "record_primary_key": null,
            "source_identity": null,
            "record_primary_value": null,
            "table_name": "all_mysql_type_test"
        },
        "recordType": "DDL"
    }
    

Debezium JSON message format

When you synchronize data from a MySQL-compatible tenant of OceanBase Database to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message formats can be used for the Debezium serialization method. By default, only the structure of payload is displayed.

  • The schema and payload parts are both present.

    {
        "schema": { // describes the information of the `payload` field. The default value is an empty object.
            "type": "struct", // struct indicates that the field contains a nested structure.
            "optional": false, // specifies whether the field is mandatory.
            "fields": [
                {
                    "type": "int64", // specifies the type of the field.
                    "optional": false, // specifies whether the field is mandatory.
                    "field": "ts_ms" // specifies the name of the field.
                }
            ]
        },
        "payload": {
            "op": "c", // indicates the type of data modification. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
            "source": {
                "version": "", // the version of OMS.
                "connector": "OB_MYSQL", // the type of the data source.
                "name": "OMS", // the fixed value OMS.
                "ts_ms": 0, // the timestamp of data change in seconds. This field is present only in incremental synchronization.
                "db": "test", // the name of the database to which the changed data belongs. For OceanBase Database, only the database name is provided without the tenant name in the SQL statement.
                "table": "testTab", // the name of the table to which the changed data belongs.
                "pos": "553132@1668496109" // the position of the binlog file in the format of [binlog file name]@[binlog file offset]
            },
            "before": { // the image of the data before the change
                "column": "value" // the key-value pairs, which contain complete key-value data
            },
            "after": { // the image of the data after the change
                "column": "value" // the key-value pairs, which contain complete key-value data
            },
            "ts_ms": 1668497367188 // the timestamp when the data is processed
        }
    }
    
  • Only the payload part is present.

    {
        "payload": {
            "op": "c", // indicates the type of data modification. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
            "source": {
                "version": "", // the version of OMS.
                "connector": "OB_MYSQL", // the type of the data source.
                "name": "OMS", // the fixed value OMS.
                "ts_ms": 0, // the timestamp of data change in seconds. This field is present only in incremental synchronization.
                "db": "test", // the name of the database to which the changed data belongs. For OceanBase Database, only the database name is provided without the tenant name in the SQL statement.
                "table": "testTab", // the name of the table to which the changed data belongs.
                "pos": "553132@16684****" // the position of the binlog file in the format of [binlog file name]@[binlog file offset]
            },
            "before": { // the image of the data before the change
                "column": "value" // the key-value pairs, which contain complete key-value data
            },
            "after": { // the image of the data after the change
                "column": "value" // the key-value pairs, which contain complete key-value data
            },
            "ts_ms": 1668497367188 // the timestamp when the data is processed
        }
    }
    

Here are some examples:

  • An example of data inserted (INSERT operation)

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"c",
                "source":{
                "connector":"OB_MYSQL",
                "pos":"703223@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668491621000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":2,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495423594
              }
              }
    
  • An example of data updated (UPDATE operation)

    {
      "schema":{
        "optional":false,
        "type":"STRUCT",
        "fields":[
          {
            "field":"before",
            "optional":false,
            "type":"struct",
            "fields":[
              {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
              {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
              {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
              {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
              {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
              {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
              {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"after",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"c01",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c02",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c03",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c04",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c05",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c06",
                "optional":false,
                "type":"int16"
              },
                {
                "field":"c07",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c08",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c09",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c10",
                "optional":false,
                "type":"float64"
              },
                {
                "field":"c11",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c12",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c13",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c14",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c15",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c16",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c17",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c18",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c19",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c20",
                "optional":false,
                "type":"bytes"
              },
                {
                "field":"c21",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c22",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c23",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"c24",
                "optional":false,
                "type":"string"
              },
                {
                "field":"c25",
                "optional":false,
                "type":"int32"
              },
                {
                "field":"c26",
                "optional":false,
                "type":"bytes"
              }
                ]
              },
                {
                "field":"source",
                "optional":false,
                "type":"struct",
                "fields":[
                {
                "field":"version",
                "optional":false,
                "type":"string"
              },
                {
                "field":"connector",
                "optional":false,
                "type":"string"
              },
                {
                "field":"name",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"db",
                "optional":false,
                "type":"string"
              },
                {
                "field":"table",
                "optional":false,
                "type":"string"
              },
                {
                "field":"server_id",
                "optional":false,
                "type":"int64"
              },
                {
                "field":"pos",
                "optional":false,
                "type":"string"
              }
                ]
              },
                {
                "field":"op",
                "optional":false,
                "type":"string"
              },
                {
                "field":"ts_ms",
                "optional":false,
                "type":"int64"
              }
                ]
              },
                "payload":{
                "op":"u",
                "before":{
                "c11":"a",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"6900000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "source":{
                "connector":"OB_MYSQL",
                "pos":"436999@166849****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668495861000,
                "db":"test",
                "table":"table_name"
              },
                "after":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
              },
                "ts_ms":1668495906356
              }
              }
    
  • An example of data deleted (DELETE operation)

    {
        "schema":{
            "optional":false,
            "type":"STRUCT",
            "fields":[
                {
                    "field":"before",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"after",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"c01",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c02",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c03",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c04",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c05",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c06",
                            "optional":false,
                            "type":"int16"
                        },
                        {
                            "field":"c07",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c08",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c09",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c10",
                            "optional":false,
                            "type":"float64"
                        },
                        {
                            "field":"c11",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c12",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c13",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c14",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c15",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c16",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c17",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c18",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c19",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c20",
                            "optional":false,
                            "type":"bytes"
                        },
                        {
                            "field":"c21",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c22",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c23",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"c24",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"c25",
                            "optional":false,
                            "type":"int32"
                        },
                        {
                            "field":"c26",
                            "optional":false,
                            "type":"bytes"
                        }
                    ]
                },
                {
                    "field":"source",
                    "optional":false,
                    "type":"struct",
                    "fields":[
                        {
                            "field":"version",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"connector",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"name",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"ts_ms",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"db",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"table",
                            "optional":false,
                            "type":"string"
                        },
                        {
                            "field":"server_id",
                            "optional":false,
                            "type":"int64"
                        },
                        {
                            "field":"pos",
                            "optional":false,
                            "type":"string"
                        }
                    ]
                },
                {
                    "field":"op",
                    "optional":false,
                    "type":"string"
                },
                {
                    "field":"ts_ms",
                    "optional":false,
                    "type":"int64"
                }
            ]
        },
        "payload":{
            "op":"d",
            "before":{
                "c11":"aa",
                "c10":2.4212412,
                "c13":"c",
                "c12":"b",
                "c15":"65",
                "c14":"d",
                "c17":"67",
                "c16":"f",
                "c19":"69000000000",
                "c18":"68",
                "c20":"6A",
                "c22":19311,
                "c21":"2022-11-15T05:12:11Z",
                "c02":"12312",
                "c24":1668489131000,
                "c01":1,
                "c23":36060000000,
                "c04":"61",
                "c26":"6B",
                "c03":"1241.41000",
                "c25":2022,
                "c06":141,
                "c05":11,
                "c08":412124124,
                "c07":4241,
                "c09":2.11111
            },
            "source":{
                "connector":"OB_MYSQL",
                "pos":"553132@1668****",
                "name":"OMS",
                "version":"",
                "ts_ms":1668496109000,
                "db":"test",
                "table":"table_name"
            },
            "ts_ms":1668496119717
        }
    }
    

DebeziumFlatten JSON message format

When you synchronize data from a MySQL tenant of OceanBase Database to Kafka, DataHub (BLOB type), and RocketMQ, the JSON message is serialized in the DebeziumFlatten format. Compared with the Debezium format, this format does not include the schema and payload fields.

{
    "op": "c", // The data modification type. Valid values include c (full insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
    "source": {
        "version": "", // The version of OMS.
        "connector": "OB_MYSQL", // The type of the data source.
        "name": "OMS", // Fixed value OMS.
        "ts_ms": 0, // The timestamp of data change in seconds. This field is present only in incremental synchronization.
        "db": "test", // The name of the database to which the change is applied. For OceanBase Database, only the database name is provided without the tenant name.
        "table": "testTab", // The name of the table to which the change is applied.
        "pos": "553132@16684****" // The position of the binlog file in the format of [binlog filename]@[binlog file offset]
    },
    "before": { // The image of the data before the change.
        "column": "value" // The key-value pairs, including full key-value data.
    },
    "after": { // The image of the data after the change.
        "column": "value" // The key-value pairs, including full key-value data.
    },
    "ts_ms": 1668497367188 // The timestamp when the data is processed.
}

Here is an example:

  • An example of data inserted (INSERT operation)

    {
       "op":"c",
       "source":{
           "connector":"OB_MYSQL",
           "pos":"703223@166849****",
           "name":"OMS",
           "version":"",
           "ts_ms":1668491621000,
           "db":"test",
           "table":"table_name"
           },
       "after":{
           "c11":"a",
           "c10":2.4212412,
           "c13":"c",
           "c12":"b",
           "c15":"65",
           "c14":"d",
           "c17":"67",
           "c16":"f",
           "c19":"69000000000000000000",
           "c18":"68",
           "c20":"6A",
           "c22":19311,
           "c21":"2022-11-15T05:12:11Z",
           "c02":"12312",
           "c24":1668489131000,
           "c01":2,
           "c23":36060000000,
           "c04":"61",
           "c26":"6B",
           "c03":"1241.41000",
           "c25":2022,
           "c06":141,
           "c05":11,
           "c08":412124124,
           "c07":4241,
           "c09":2.11111
           },
           "ts_ms":1668495423594
           }
    
  • An example of data updated (UPDATE operation)

    {
        "op":"u",
        "before":{
            "c11":"a",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"690000000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"436999@166849****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668495861000,
            "db":"test",
            "table":"table_name"
            },
        "after":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"69000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
            "ts_ms":1668495906356
            }
    
  • An example of data deleted (DELETE operation)

    {
        "op":"d",
        "before":{
            "c11":"aa",
            "c10":2.4212412,
            "c13":"c",
            "c12":"b",
            "c15":"65",
            "c14":"d",
            "c17":"67",
            "c16":"f",
            "c19":"69000000000000000000000000000",
            "c18":"68",
            "c20":"6A",
            "c22":19311,
            "c21":"2022-11-15T05:12:11Z",
            "c02":"12312",
            "c24":1668489131000,
            "c01":1,
            "c23":36060000000,
            "c04":"61",
            "c26":"6B",
            "c03":"1241.41000",
            "c25":2022,
            "c06":141,
            "c05":11,
            "c08":412124124,
            "c07":4241,
            "c09":2.11111
            },
        "source":{
            "connector":"OB_MYSQL",
            "pos":"553132@1668****",
            "name":"OMS",
            "version":"",
            "ts_ms":1668496109000,
            "db":"test",
            "table":"table_name"
        },
        "ts_ms":1668496119717
    }
    

DebeziumSmt JSON format

DebeziumSmt is a configuration method provided by Debezium. Single Message Transform (SMT) converts and processes a single message in an event flattened way. When you synchronize data from a MySQL-compatible tenant of OceanBase Database to Kafka, DataHub (blob type), or RocketMQ, the following JSON message format is used for the DebeziumSmt serialization method, displaying only key:value fields in the after.

The following example shows how to use the Debezium serialization method to update data:

{
    "op": "u",
    "source": {
        "connector": "OB_MYSQL",
        "name": "OMS"
    },
    "ts_ms": 1668496119717,
    "before": {
        "field1": "before_value1",
        "field2": "before_value2"
    },
    "after": {
        "field1": "after_value1",
        "field2": "after_value2"
    }
}

After SMT processes the preceding example, the message format is simplified. That is, when the serialization mode is set to DebeziumSmt, the JSON message format is as follows.

{
    "field1": "after_value1",
    "field2": "after_value2"
}

Here are some examples:

  • An example of data inserted (INSERT operation)

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "false"
    }
    
  • An example of data updated (UPDATE operation)

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "false"
    }
    
  • An example of data deleted (DELETE operation)

    {
        "field1": "after_value1",
        "field2": "after_value2",
        "__deleted": "true"
    }
    

Avro JSON message format

When data is synchronized from a MySQL tenant of OceanBase Database to a Kafka topic, the data is serialized in the Avro format and transmitted in the following JSON message format.

  • Full migration

    {
        "version": 1,
        "id": 0,
        "sourceTimestamp": 1702371565, // The timestamp of the safe point.
        "sourcePosition": "", // No position information in full migration.
        "safeSourcePosition": "",
        "sourceTxid": "",
        "source": {
            "sourceType": "MySQL", // Fixed value MySQL.
            "version": "OBMySQL" // Fixed value OBMySQL.
        },
        "operation": "INIT", // The operation type is INIT in full migration.
        "objectName": "test***",
        "processTimestamps": [
            1702371565238
        ], // Contains only the delivery time.
        "tags": {
            "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Contains only the primary key.
        },
        "fields": [
            {
                "name": "id",
                "dataTypeNumber": 246
            }, // The type of each column.
            {
                "name": "bid",
                "dataTypeNumber": 3
            },
            {
                "name": "name",
                "dataTypeNumber": 15
            },
            {
                "name": "address",
                "dataTypeNumber": 254
            }
        ],
        "beforeImages": null, // The image before full migration is empty.
        "afterImages": [ // The image after full migration. The precision of an INTEGER value is 8, and the scale of a FLOAT value is 64.
            {
                "value": "1",
                "precision": 1,
                "scale": 0
            },
            {
                "precision": 8,
                "value": "11"
            },
            {
                "charset": "utf8mb4",
                "value": {
                    "bytes": "yyy"
                }
            },
            null
        ]
    }
    
  • Incremental synchronization of DML data

    • Here is an example of data inserted.

      {
        "version": 1, 
        "id": 170236922143600000, 
        "sourceTimestamp": 1702369092, 
        "sourcePosition": "1702369080",  // Checkpoint of OceanBase MySQL compatible mode
        "safeSourcePosition": "1702369080", // Checkpoint of OceanBase MySQL compatible mode
        "sourceTxid": "",                
        "source": {
          "sourceType": "MySQL", 
          "version": "OBMySQL"
        }, 
        "operation": "INSERT", 
        "objectName": "test***", 
        "processTimestamps": [1702369221480],  
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" 
        }, 
        "fields": [
          {"name": "id", "dataTypeNumber": 8},  
          {"name": "bid", "dataTypeNumber": 3}, 
          {"name": "name", "dataTypeNumber": 15}
        ], 
        "beforeImages": null,                // Pre-image is empty before INSERT.
        "afterImages": [
          {"precision": 8, "value": "2"},    
          {"precision": 8, "value": "12"}, 
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}
          }
        ]
      }
      
    • Here is an example of data updated.

      {
        "version": 1,
        "id": 170236975822100001, 
        "sourceTimestamp": 1702369757, 
        "sourcePosition": "1702369756", 
        "safeSourcePosition": "1702369756", 
        "sourceTxid": "", 
        "source": {
          "sourceType": "MySQL", 
          "version": "OBMySQL"
        }, 
        "operation": "UPDATE", 
        "objectName": "test***", 
        "processTimestamps": [1702369758237], 
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}"
        }, 
        "fields": [
          {"name": "id", "dataTypeNumber": 8}, 
          {"name": "bid", "dataTypeNumber": 3}, 
          {"name": "name", "dataTypeNumber": 15}
        ], 
        "beforeImages": [   // UPDATE has both before-image and after-image.
          {"precision": 8, "value": "3"}, 
          {"precision": 8, "value": "22"}, 
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ], 
        "afterImages": [
          {"precision": 8, "value": "3"}, 
          {"precision": 8, "value": "44"}, 
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ]
      }
      
    • Here is an example of data deleted.

      {
        "version": 1,
        "id": 170236976527500000,
        "sourceTimestamp": 1702369764,
        "sourcePosition": "1702369763",
        "safeSourcePosition": "1702369763",
        "sourceTxid": "",
        "source": {
          "sourceType": "MySQL",
          "version": "OBMySQL"
        },
        "operation": "DELETE",
        "objectName": "test***",
        "processTimestamps": [1702369765287],
        "tags": {
          "pk_uk_info": "{\"PRIMARY\":[\"id\"]}"
        },
        "fields": [
          {"name": "id", "dataTypeNumber": 8},
          {"name": "bid", "dataTypeNumber": 3},
          {"name": "name", "dataTypeNumber": 15}
        ],
        "beforeImages": [
          {"precision": 8, "value": "3"},
          {"precision": 8, "value": "44"},
          {"charset": "utf8mb4", "value": {"bytes": "xxx"}}
        ],
        "afterImages": null // The after-image of DELETE is empty.
      }
      
  • Incremental synchronization of DDL data

    {
      "version": 1, 
      "id": 170236979372400000, 
      "sourceTimestamp": 1702369793, 
      "sourcePosition": "1702369792", 
      "safeSourcePosition": "1702369792", 
      "sourceTxid": "", 
      "source": {
        "sourceType": "MySQL", 
        "version": "OBMySQL"
      }, 
      "operation": "DDL", 
      "objectName": "test***", 
      "processTimestamps": [1702369794543], 
      "tags": {}, 
      "fields": null,    // No fields or beforeImages for incremental synchronization of DDL
      "beforeImages": null, 
      "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The STRING-type afterImages contain DDL statements.
    }
    

Format of data transmitted from OceanBase Database to text protocols

When you synchronize data from OceanBase Database to Kafka, DataHub (blob type), or RocketMQ:

  • If the serialization format is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType, the data types and mappings for MySQL and Oracle compatible tenants of OceanBase Database are described as follows.

    • OceanBase MySQL compatible mode

      Data Type Mapping type Description
      TINYINT
      SMALLINT
      MEDIUMINT
      INT
      INTEGER
      YEAR
      BOOL
      BOOLEAN
      Long Integer types below 64-bit.
      Regular numbers (e.g., 1000) are displayed without scientific notation.
      For BOOL/BOOLEAN: true = 1, false = 0.
      DECIMAL
      NUMERIC
      BigDecimal Exact decimal numeric types and integers above 64-bit. Integer values display without decimal points. Decimal values retain original precision (with trailing zeros) and use scientific notation.
      FLOAT
      DOUBLE
      Double Floating-point numbers.
      Precision follows source type: FLOAT (7 significant digits), DOUBLE (16 significant digits).
      CHAR
      VARCHAR
      TINYTEXT
      TEXT
      MEDIUMTEXT
      LONGTEXT
      ENUM
      SET
      String String values.
      TINYBLOB
      BLOB
      MEDIUMBLOB
      LONGBLOB
      BINARY
      VARBINARY
      BIT
      Bytes Byte arrays displayed in BASE64 encoding by default.
      Note: For fixed-length BIT types, incremental sync trims leading zeros (unlike full sync), but decoded results remain consistent.
      DATE Date Date format: YYYY-MM-DD.
      Invalid dates display original strings.
      TIME Time Time format: HH:mm:ss[.nnnnnnnnn].
      Sub-second precision displays up to 9 digits (showing all non-zero values).
      Invalid times display original strings.
      DATETIME DateTime DateTime with timezone: YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
      Sub-second rules same as TIME.
      Invalid datetimes display original strings.
      TIMESTAMP Timestamp Format: [Unix timestamp][.nnnnnnnnn].
      Sub-second rules same as TIME.
      Invalid timestamps display as 0000-00-00 00:00:00.
    • OceanBase Oracle compatible mode

      Data Type Mapping type Description
      INTEGER Long Integer types below 64-bit.
      Regular numbers (e.g., 1000) without scientific notation.
      NUMBER
      FLOAT
      BigDecimal Exact decimal numeric types and integers exceeding 64-bit.
      BINARY_FLOAT
      BINARY_DOUBLE
      Double Floating-point numbers.
      Precision follows source type: FLOAT (7 significant digits), DOUBLE (16 significant digits).
      VARCHAR2
      NVARCHAR2
      INTERVAL YEAR TO MONTH
      INTERVAL DAY TO SECOND
      CLOB
      NCLOB
      ROWID
      UROWID
      String String values.
      BLOB
      BFILE
      RAW
      Bytes Byte arrays displayed in BASE64 encoding.
      DATE
      TIMESTAMP
      TIMESTAMP WITH TIME ZONE
      TIMESTAMP WITH LOCAL TIME ZONE
      DateTime DateTime with timezone: YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
      Sub-second precision displays up to 9 digits (showing all non-zero values).
      Invalid values display original strings.
  • If the serialization method is Debezium, the data type mappings for MySQL-compatible tenants of OceanBase Database are described in the following table.

    Notice

    When you synchronize data from an Oracle-compatible tenant of OceanBase Database to Kafka, DataHub (blob type), or RocketMQ, you cannot set the serialization method to Debezium.

    Data type Mapping type Description
    BOOLEAN
    BOOL
    BOOLEAN The values are true and false.
    TINYINT
    SMALLINT
    MEDIUMINT
    INT/INTEGER
    BIGINT
    YEAR
    LONG Integers in the range of -2^63^ to 2^63^.
    BIGINT STRING The data is displayed in string format.
    FLOAT
    DOUBLE
    DOUBLE Floating-point numbers.
    DECIMAL
    NUMERIC
    STRING The data is displayed in string format. For decimal values, shows exact precision as received from the database (preserving trailing zeros) and always uses scientific notation.
    BIT
    BINARY
    VARBINARY
    TINYBLOB
    BLOB
    MEDIUMBLOB
    LONGBLOB
    BYTES Arrays of bytes. The data is displayed in base16-encoded format.
    CHAR
    VARCHAR
    TINYTEXT
    TEXT
    MEDIUMTEXT
    LONGTEXT
    ENUM
    SET
    STRING Strings.
    TIMESTAMP STRING The timestamp format is YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z, and the time zone is 0.
    DATE LONG The number of days since January 1, 1970.
    TIME LONG The value of time (in microseconds) since 00:00:00, not including the time zone information.
    DATETIME LONG The number of milliseconds since January 1, 1970, 00:00:00, not including the time zone information.
  • If the serialization format is Avro, the data types and mappings for the MySQL tenant of OceanBase Database are described as follows.

    Notice

    The serialization format Avro is supported only when you synchronize data from a MySQL tenant of OceanBase Database to Kafka.

    Type name Mapping type
    TINYINT
    BOOLEAN
    SMALLINT
    MEDIUMINT
    INT
    BIGINT
    BIT
    INTEGER
    FLOAT
    DOUBLE
    FLOAT
    DECIMAL
    NUMERIC
    DECIMAL
    VARCHAR
    CHAR
    TINYTEXT
    MEDIUMTEXT
    LONGTEXT
    TEXT
    CHARACTER
    BINARY
    VARBINARY
    TINYBLOB
    MEDIUMBLOB
    LONGBLOB
    BLOB
    BinaryObject
    TIMESTAMP TimestampObject
    Description For a TIMESTAMP type, full and incremental synchronizations both convert the data into a timestamp. Illegal timestamp is -9223372022400L.
    Except for the illegal timestamp, you can use the Instant.ofEpochSecond(ts, nanos) method of Java to obtain the correct wall-clock time.
    DATE
    TIME
    DATETIME
    YEAR
    DATETIME
    JSON
    ENUM
    SET
    TextObject
    GEOMETRY TextGeometry
    Description At present, data transmission uses the EWKT format to transmit geometry data. Therefore, the data is mapped to the TextGeometry type.

Previous topic

Column filtering
Last

Next topic

Overview
Next
What is on this page
Serialization formats
Default JSON message format
JSON format of Canal messages
JSON message format of DataWorks
JSON format of SharePlex messages
DefaultExtendColumnType JSON message format
Debezium JSON message format
DebeziumFlatten JSON message format
DebeziumSmt JSON format
Avro JSON message format
Format of data transmitted from OceanBase Database to text protocols