This topic describes the serialization format and the data format for database transmission when data is transferred from a database to a text protocol.
Serialization formats
When you use OceanBase Migration Service (OMS) Community Edition to synchronize data from the source to a Kafka, DataHub (Blob type), or RocketMQ instance, you can use the serialization format to control the message format of data synchronization to the target. The following table describes the serialization formats supported: Default, Canal, Dataworks (V2.0), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Maxwell (supported only when the target is a Kafka instance).
Default JSON message format
The following JSON message format is used when data is synchronized to a Kafka, DataHub instance of the Blob data type, or RocketMQ instance, and the serialization method is set to Default.
{
"prevStruct": { // The image before the change.
"col1": "val1" // A key-value pair that contains all keys.
},
"postStruct": { // The image after the change.
"col1": "val1" // A key-value pair that contains all keys.
},
"allMetaData" {
"checkpoint": "STRING", // The current synchronization checkpoint. In incremental synchronization, this field indicates the timestamp (in seconds) to which the data is synchronized. In full synchronization, this field contains key-value pairs of primary keys.
"record_primary_key": "STRING", // The name of the primary key column. The names of multiple primary key columns are separated with \u0001.
"record_primary_value" "STRING", // The value of the primary key. Multiple primary key values are separated with \u0001.
"source_identity": "STRING", // The identifier of the source. In incremental synchronization, this field contains the subtopic. In full synchronization, this field contains meaningless serial numbers.
"dbType": "STRING", // The type of the database. Valid values: MYSQL, ORACLE, OB_IN_ORACLE_MODE (compatible and for use only with the old mode), OB_MYSQL, OB_ORACLE, and DB2 (compatible and for use only with the old mode).
"storeDataSequence": LONG, // The sequence number of the data stored in the source.json file. The sequence number is generated based on the timestamp and a incremental number not exceeding five digits. The sequenceEnabled parameter must be set to true in the source.json file for the field to exist. The default value is true. This field is used for sorting.
"table_name": "STRING", // The name of the table whose data is changed by using an SQL statement.
"db": "STRING", // The name of the database whose data is changed by using an SQL statement. For an OceanBase database, the database name is in the format of {tenant name}.{database name}.
"timestamp": "STRING", // The timestamp of the data change in seconds. This field exists only in incremental synchronization.
"uniqueId": "STRING", // The transaction sequence identifier transmitted from the store. This field exists only in incremental synchronization.
"transId": "STRING", // The transaction ID. For more information, see the descriptions of the transId field in [DDL operations](url-placeholder/50000000-0000-0000-0000-000000000000) and [DML operations](url-placeholder/50000000-0000-0000-0000-000000000001).
"clusterId": "STRING", // The cluster identifier for an OceanBase database, the thread identifier for a MySQL database, or a null value for an Oracle database.
"ddlType": "STRING", // The DDL type. This field is supported in OMS Community Edition V4.1.1-CE.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // The type of the change.
}
In the case of a DDL change, the "ddl" column exists, and its value is an DDL statement.
prevStructandpostStruct:prevStruct: the image before the change, which is the data before the SQL statement is executed.postStruct: the image after the change, which is the data after the SQL statement is executed.
DELETEchanges contain onlyprevStruct.INSERTandDDLchanges contain onlypostStruct.UPDATEchanges contain bothprevStructandpostStruct.HEARTBEATchanges, which are periodic heartbeat messages, contain neitherpostStructnorpostStruct.The
clusterIdfield is described as follows:In OceanBase Database,
ob_org_cluster_idspecifies a session-level cluster ID, which will be persisted to the transaction log. If you specifyob_org_cluster_idwhen you write data, the specified value is applied. Otherwise, the default value is applied. This field is available only for incremental synchronization. For more information, see ob_org_cluster_id.In a MySQL database, the
clusterIdfield indicates the threadId in a MySQL binlog event. The MySQL server assigns an incremental threadId to each thread in a connection.
The following examples show sample data.
Example of inserted data
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", } }Example of updated data
{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of deleted data
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col16":1.2222, "col7":9.99999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345" }, "recordType":"DELETE", "postStruct":null }DDL example
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }Kafka heartbeat data example
{ "prevStruct": null, "postStruct": null, "allMetaData": { "checkpoint": "1744170715", "dbType": "OB_MYSQL", "storeDataSequence": 174417071548000000, "db": null, "timestamp": "1744170715", "uniqueId": null, "transId": null, "clusterId": "1", "ddlType": null, "record_primary_key": null, "source_identity": "OB_MYSQL_CE_ten_1_6n1****r4-1-0", "record_primary_value": null, "table_name": null }, "recordType": "HEARTBEAT" }
JSON format of Canal messages
The following JSON message format is used for serialization when data is synchronized to a Kafka, DataHub instance of the Blob data type, or RocketMQ instance.
{
"database": "STRING", // The name of the database in which the changed data is located. For OceanBase Database, only the database name is provided without the tenant name.
"sqlType": {
"col1": INTEGER, // The type of the changed column. The reference values are the same as those in java.sql.Types.
},
"data": [ // The key-value pairs of the changed data. At present, only one message is included.
{
"col1": "val1"
}
],
"pkNames": [ // The names of the primary key columns.
"col1"
],
"old": [ // This field exists only in UPDATE messages. It indicates the columns changed by the UPDATE statement, namely, the values before the changes.
{
"col1": "val1"
}
],
"mysqlType": { // The description of the column type.
"col": "STRING"
},
"type": "STRING", // The change type.
"table": "STRING", // The name of the table in which the changed data is located.
"es": LONG, // The change time in milliseconds.
"isDdl": BOOLEAN, // Whether the message is a DDL message.
"ts": LONG, // The timestamp when the data is written to the target.
"sql": "STRING", // This field is reserved. The value is always empty.
}
The following examples show the data in the preceding JSON format.
Example of
INSERT-related data{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }Example of
UPDATE-related data{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "col":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }Example of
DELETE-related data{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }DDL example
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }Example of Kafka heartbeat data
{ "table": null, "database": null, "type": "MHEARTBEAT", "ts": 1744177995404, "es": 1744177994000, "isDdl": false, "sql": "", "data": null, "old": null, "pkNames": null, "mysqlType": null, "sqlType": null }
DataWorks JSON message format
When data is synchronized to a Kafka, DataHub (Blob), or RocketMQ instance, DataWorks uses the following JSON message format for serialization.
{
"version":"2.0", // The protocol version. Set the value to DataWorks V2.0.
"schema": { // The metadata of the changed data. The schema contains only the column names and types.
"source": {// The change source.
"dbType": "mysql", // The data source type.
"dbVersion": "5.7.35", // The database version.
"dbName": "myDatabase", // The database name.
"schema": "mySchema", // The schema name. This parameter is required if the system supports schemas.
"table": "tableName" // The table name.
}
"column": [// The changed data columns. This parameter specifies the content of the target table records.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [// The primary keys or unique keys. This parameter is required if the keys exist. Otherwise, this parameter is optional.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", // If the data type is binary, Base64 encode the data.
"ts": 1590315269000.123456789 // The timestamp. The integer part contains 13 digits, and the fractional part contains 9 digits.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",// Case sensitive
"timestamp": {
"eventTime": 1620457659000 // The time when the change was made in the source database. The timestamp has a precision of microseconds.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "auto-increment ID"
},
"extend": { // The extend fields. This parameter is reserved for future use. If it is not specified, this parameter is optional.
"load_fm":"CIBS", // The source system. For example, "CIBS".
}
}
A synchronization task sends a heartbeat message as follows:
{
"version":"2.0", // The protocol version.
"payload": {
"timestamp": {
"eventTime": 1620457659000 // The time when the heartbeat message was sent.
},
"op": "HEARTBEAT" // The operation. The value of this parameter indicates that the message is a heartbeat message.
}
}
The following examples show how to use the preceding JSON template:
Example of
INSERT(insert) operation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of
UPDATE(update) operation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of
DELETE(delete) operation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }DDL example
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn": "null" }, "extend": { } }Example of Kafka heartbeat data
{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": null, "schema": null, "table": null }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "HEARTBEAT", "timestamp": { "eventTime": 1744179056998, "systemTime": 1744179057419, "checkpointTime": 1744179056 }, "ddl": null, "scn": "174417905734900000" }, "extend": {} }
JSON message format of SharePlex
The following JSON message format is used for serializing data for synchronization to a Kafka, DataHub instance of the BLOB data type, or RocketMQ instance.
{
"data": { // Key-value pairs of change data. For INSERT and DELETE operations, the key-value pairs are complete values. For UPDATE operations, only the changed values are included.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // Change time
"op": "", // Change type. Valid values: ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // Time when data is written to the target
"idx":"STRING", // The index of the message in the transaction or the number of indexed messages. At present, OMS does not count the total number of transactions. Therefore, this parameter is invalid.
"size": NUMBER, // Number of messages in the transaction. This parameter is deprecated.
"seq": "STRING", // Sequence number. This parameter is returned only when transactionEnabled is set to true in the source database.
"table": "STRING", // Name of the source table in the SQL database. The value of this parameter is in the {database}.{table} format.
"rowid": "STRING", // {database name}.{table name}-{primary key value}. If the primary key consists of multiple columns, the columns are separated by the character \u0001.
"trans": "STRING", // Transaction ID
"scn": "STRING", // This field is returned only when sequenceEnabled=true is specified in the source.json configuration file in incremental synchronization. The default value is true. The field is used for sorting. The generation rule is that in a synchronization process, the timestamp is combined with an incremental sequence number not exceeding five digits.
},
"key": { // This field exists only for UPDATE operations and indicates the values before the update.
},
"sql": {"ddl": ""} // This field exists only for DDL operations. If op is ddl, the DDL statement is written to this field.
}
The following examples show data in different types of change events:
Example of
INSERTdata{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of
UPDATEdata{ "data":{ "col":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" } }Example of
DELETEdata{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Examples of DDL statements
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }Example of Kafka heartbeat data
{ "meta": { "time": "2025-04-09T14:17:19", "op": "heartbeat", "rowid": null, "trans": null, "seq": 0, "size": 0, "table": null, "idx": "0/0", "posttime": "2025-04-09T14:17:19", "scn": "174417943989400000" }, "data": {} }
DefaultExtendColumnType JSON message format
When data is synchronized to a Kafka, DataHub instance of the Blob data type, or a RocketMQ instance, the DefaultExtendColumnType serialization method uses the following JSON message format.
The DefaultExtendColumnType JSON message format adds the __light_type field to the image based on the DEFAULT structure. The __light_type field indicates the data type of the fields.
{
"prevStruct": { // The structure before the change.
},
"postStruct": { // The structure after the change.
"__light_type": {
"col":{ // The name of the field.
"schemaType":"type" // The value type.
}
}
},
"allMetaData" {},
}
Here are some data examples:
Example of
INSERT(insert) data{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }Example of
UPDATE(update) data{ "allMetaData": { "checkpoint": null, "record_primary_key": "id1\u0001id2", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world 2020", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }Example of
DELETE(delete) data{ "allMetaData":{ "checkpoint":null, "record_primary_key":"id1\u0001id2", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }DDL examples
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }Example of Kafka heartbeat data
{ "prevStruct": null, "postStruct": null, "allMetaData": { "checkpoint": "1744179731", "dbType": "OB_MYSQL", "storeDataSequence": 174417973200600000, "db": null, "timestamp": "1744179731", "uniqueId": null, "transId": null, "clusterId": "1", "ddlType": null, "record_primary_key": null, "source_identity": "OB_MYSQL_CE_ten_1_6n1****r4-1-0", "record_primary_value": null, "table_name": null }, "recordType": "HEARTBEAT" }
Debezium JSON message format
When you synchronize data from the MySQL compatible mode of OceanBase Database to a Kafka, DataHub (with the Blob data type), or RocketMQ instance, Debezium uses the following JSON message format. The format contains two types, and generally, only the structure in the payload field is displayed.
Both
schemaandpayloadfields are present. {#2f02ea300egch}{ "schema":{// Describes the structure of the payload fields. This structure is not provided by default. "type": "struct",// struct indicates that the field contains subfields. "optional": false,// specifies whether the field is required. "fields": [ { "type": "int64",// The type of the field. "optional": false,// specifies whether the field is required. "field": "ts_ms"// The name of the field. } ... ] }, "payload":{ "op":"c", // The type of data modification. Valid values: c (full insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source":{ "version":"" // The version of OMS. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // The fixed value OMS. "ts_ms":0, // The second timestamp of data modification. This field is provided only for incremental data. "db":"test", // The name of the database whose data is changed by using SQL statements. For an OceanBase database, only the database name is provided without the tenant name. "table":"testTab" // The name of the table whose data is changed by using SQL statements. "pos":"553132@1668496109" // The position of the binlog file in the format of [binlog file name]@[binlog file offset]. }, "before":{ // The image of the data before the change. "column":"value" // The key-value pairs of the full data. } "after":{ // The image of the data after the change. "column":"value" // The key-value pairs of the full data. }, "ts_ms":1668497367188 // The data processing timestamp. } }Only the
payloadfield is present.{ "payload":{ "op":"c", // The type of data modification. Valid values: c (full insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source":{ "version":"" // The version of OMS. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // The fixed value OMS. "ts_ms":0, // The second timestamp of data modification. This field is provided only for incremental data. "db":"test", // The name of the database whose data is changed by using SQL statements. For an OceanBase database, only the database name is provided without the tenant name. "table":"testTab" // The name of the table whose data is changed by using SQL statements. "pos":"553132@16684****" // The position of the binlog file in the format of [binlog file name]@[binlog file offset]. }, "before":{ // The image of the data before the change. "column":"value" // The key-value pairs of the full data. } "after":{ // The image of the data after the change. "column":"value" // The key-value pairs of the full data. }, "ts_ms":1668497367188 // The data processing timestamp. } }
Sample data is as follows:
Sample data for the
INSERToperation{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }Sample data for the
UPDATEoperation{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }Sample data for the
DELETEoperation{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }Sample Kafka heartbeat data
{ "payload": { "source": { "version": "2.0.1.Final", "connector": "OB_MYSQL", "name": "OMS", "ts_ms": 1744180390000, "pos": 1744180390 }, "op": "HEARTBEAT", "ts_ms": 1744180390881, "before": {}, "after": {} } }
DebeziumFlatten JSON message format
When you synchronize data from the MySQL compatible mode of OceanBase Database to a Kafka, DataHub (with the Blob data type), or RocketMQ instance, the JSON message is serialized in the following format using the DebeziumFlatten method. Compared with the Debezium method, this method does not include the schema and payload fields.
{
"op":"c", //The data modification type. c indicates full data transmission or insertion, u indicates update, and d indicates deletion. HEARTBEAT indicates a heartbeat message.
"source":{
"version":"" //The version of OMS.
"connector":"OB_MYSQL", //The type of the data source.
"name":"OMS", //The fixed value OMS.
"ts_ms":0, //The second-level timestamp of data changes. This field is included only in incremental data.
"db":"test", //The name of the database that changes data by using SQL statements. For OceanBase Database, only the database name is included without the tenant name.
"table":"testTab" //The name of the table that changes data by using SQL statements.
"pos":"553132@16684****" //The position of the binlog file in the format of [binlog file name]@[binlog file name offset].
},
"before":{ //The image of the data before the change.
"column":"value" //Key-value pairs that contain complete key values.
}
"after":{ //The image of the data after the change.
"column":"value" //Key-value pairs that contain complete key values.
},
"ts_ms":1668497367188 //The timestamp when the data is processed.
}
The following examples show data in different scenarios:
Example of
INSERTdata{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }Example of
UPDATEdata{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }Example of
DELETEdata{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }Kafka heartbeat data example
{ "source": { "version": "2.0.1.Final", "connector": "OB_MYSQL", "name": "OMS", "ts_ms": 1744180605000, "pos": 1744180605 }, "op": "HEARTBEAT", "ts_ms": 1744180606079, "before": {}, "after": {} }
DebeziumSmt JSON message format
DebeziumSmt is a configuration method provided by Debezium for converting and processing a single message using Single Message Transform (SMT). When you synchronize data from the MySQL compatible mode of OceanBase Database to a Kafka, DataHub (Blob type), or RocketMQ instance, the JSON message format of the DebeziumSmt serialization method only displays key:value in the after field.
For example, update data by using the serialization method of Debezium:
{
"op": "u",
"source": {
"connector":"OB_MYSQL",
"name":"OMS",
...
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
After SMT processes the preceding example message, it simplifies the message format. Specifically, the JSON message format is as follows when DebeziumSMT is used for serialization.
{
"field1": "after_value1",
"field2": "after_value2"
}
Here is an example:
Example of inserting data
{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of updating data with the
UPDATEstatement{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of deleting data with the
DELETEstatement{ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }Example of Kafka heartbeat data
{ "payload": { "source": { "version": "2.0.1.Final", "connector": "OB_MYSQL", "name": "OMS", "ts_ms": 1744181363000, "pos": 1744181363 }, "op": "HEARTBEAT", "ts_ms": 1744181364560, "before": {}, "after": {} } }
Maxwell JSON message format
When data is synchronized to Kafka, Maxwell uses the following JSON message format for serialization.
{
"database":"STRING", //Name of the database.
"table":"STRING", //Name of the table.
"type":"STRING", // The operation type. Valid values: insert, update, delete, table-create, table-alter, and table-drop.
"ts":LONG, // Time stamp. The value is 1600000000L in full synchronization.
"xid":"STRING", // In OceanBase Database Community Edition V4.x and earlier, xid represents the transaction ID. inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the hash value of the three preceding elements. If the transaction is incomplete, xid is null. In OceanBase Database Community Edition V4.x and later, xid is in the format of "tenant ID.transaction ID". For example, "1002_336195". In a MySQL database, xid represents a checkpoint.
"commit":BOOLEAN, // Whether to commit the transaction, fixed as true.
"position":"STRING",//Position, which stores a timestamp. For example, "1684224949".
"primary_key":OBJECT[],//Primary key value. It is null in DDL statements and OBJECT[] in DML statements. For example, [1,"val1"], or [] if there is no primary key.
"primary_key_columns":STRING[],//Primary key columns. It is null in DDL statements, and OBJECT[] in DML statements, corresponding one-to-one with primary key values. For example, ["key1","key2"], or [] if there is no primary key.
"data": { // Data after DML changes.
"col1":"val1",//Key-value pair.
},
"old":{//DML change data before the change.
"col1":"val1",//Key-value pair.
}
"sql":"STRING",//DDL executes SQL statements.
"def":{//DDL change object information.
"database":"STRING",//The name of the database.
"table":"STRING",//The name of the table.
"primary-key":OBJECT[],//Primary key value, fixed as [].
"columns":STRING[]//The primary key columns. The value is always [].
},
"uuid":"STRING"//Unique identifier.
"mesh":"prd/gray" // Gray scale identifier, this field will only be available after configuration.
}
Here is an example:
Example of inserting data with the
INSERTstatement{ "database":"test", "table":"test", "type":"insert", "ts":1684225073, "xid":"1002_336195", "commit":true, "position":"1684225073", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":4.2341, "id":1, "c":"2016-10-21 05:33:37.523000" }, "old":null, "sql":null, "def":null }Example of updating data with the
UPDATEstatement{ "database":"test", "table":"test", "type":"update", "ts":1684225127, "xid":"1002_336403", "commit":true, "position":"1684225126", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":5.444, "id":1, "c":"2016-10-21 05:33:54.63100" }, "old":{ "m":4.2341, "id":1, "c":"2016-10-21 05:33:37.523000" }, "sql":null, "def":null }Example of deleting data by using the
DELETEstatement{ "database":"test", "table":"test", "type":"delete", "ts":1684225163, "xid":"1002_336593", "commit":true, "position":"1684225163", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":5.444, "id":1, "c":"2016-10-21 05:33:54.631000" }, "old":null, "sql":null, "def":null }Example of Kafka heartbeat data
{ "database": null, "table": null, "type": "heartbeat", "ts": 1744181717000, "xid": null, "commit": true, "position": "1744181717", "data": null, "old": null, "sql": null, "def": null, "primary_key": null, "primary_key_columns": null, "uuid": "451****a-ed42-4486-aec4-a****d" }
Data transmission from a database to a text protocol
When you synchronize data from OceanBase Database Community Edition to a Kafka, DataHub (Blob type), or RocketMQ instance:
If the serialization method is Default, Canal, Dataworks (V2.0 and later), SharePlex, or DefaultExtendColumnType, the mapping description for OceanBase Community Edition is as follows.
Data type Mapped type Description TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
YEAR
BOOL/BOOLEANLong An integer type that stores integers of less than 64 bits.
Normal numbers, such as 1000, are not represented in scientific notation.
For BOOL/BOOLEAN, true is equivalent to 1 and false is equivalent to 0.DECIMAL
NUMERICBigDecimal A precise decimal data type and an integer type that exceeds 64 bits.
For integer values, the decimal point and digits after the decimal point are not displayed.
For values with decimals, the number of digits is determined by the data passed from the database. Trailing zeros are not removed, and scientific notation is used.FLOAT
DOUBLEDouble A floating-point value. The number of significant digits is determined by the source data type, FLOAT or DOUBLE.
FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETString String TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte array, displayed in BASE64 encoding by default.
Note:
For the BIT fixed-length type, after the incremental data receives the byte array, leading zeros will be removed, but this does not happen in the full data. Therefore, the BASE64 encoding may appear inconsistent. However, the actual result is consistent, and the decoded result will be the same.DATE Date Date type, formatted as YYYY-MM-DD. If it's an invalid time, the original string will be displayed.TIME Time The time type. The format is HH:mm:ss[.nnnnnnnnn].
At most nine digits are displayed for subsecond values. All non-zero digits are displayed for subsecond values. If the time is invalid, the original string is displayed.DATETIME DateTime The date and time type that includes the time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
At most nine digits are displayed for subsecond values. All non-zero digits are displayed for subsecond values. If the date and time are invalid, the original string is displayed.TIMESTAMP Timestamp The timestamp type. The format is [seconds since the Unix epoch].[nnnnnnnnn].
The least 9 digits after the decimal point are displayed for timestamps with a precision lower than seconds. For timestamps with non-zero digits below the second precision, all non-zero digits are displayed. An illegal timestamp is displayed as0000-00-00 00:00:00.The changes in the tasks created by using OMS Community Edition V3.3.0-CE and later versions compared with those in OMS Community Edition versions earlier than V3.3.0-CE:
Note
Before upgrading a task created in a version earlier than V3.3.0-CE in the OMS Community Edition to V3.3.0-CE, note that the task retains its behavior before the upgrade and is not modified.
In the Community Edition of OceanBase Database, the BIT type is mapped to base64-encoded bytes values instead of LONG values. You can use base64 decoding to obtain the same results.
In the OceanBase Community Edition, the Timestamp type is passed in the
sec.millformat, and in OMS Community Edition V3.3.0-CE and earlier, it was passed in the integer format ofsec.Other time types are in the
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn]format. In OMS Community Edition V3.3.0-CE and earlier, the format for other time types wasYYYY-MM-DDTHH:mm[:ss].
If the serialization method is Debezium, the mapping description for OceanBase Community Edition is as follows.
Data type Character type Description BOOLEAN
BOOLBOOLEAN Can take the values true and false. TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEARLONG The BIGINT type represents an integer in the range of -2^63^ to 2^63^. BIGINT STRING Displays the data in full string format. FLOAT DOUBLE Floating-point number. DECIMAL
NUMERICSTRING Displays the data as a string. For numeric values with decimal places, the number of digits is determined by the data passed from the database, and trailing zeros are not removed. Scientific notation is used. BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES An array of bytes encoded in base16. CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETSTRING A sequence of characters. TIMESTAMP STRING The timestamp is in the YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z format, where the time zone is UTC. DATE LONG The number of days since January 1, 1970. TIME LONG indicates the time value in microseconds since 00:00:00, without time zone information. DATETIME LONG The number of milliseconds since 1970-01-01 00:00:00 without time zone information. If the serialization format is Maxwell, the mapping description for OceanBase Community Edition is as follows.
Data type Mapped type Description TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
YEAR
BOOL/BOOLEANLong A 64-bit integer.
Normal numbers, such as 1000, are not represented in scientific notation.
For BOOL/BOOLEAN, true = 1 and false = 0.DECIMAL
NUMERICBigDecimal A precise decimal data type and an integer type that exceeds 64 bits.
For integer values, the decimal point and digits after the decimal point are not displayed.
For values with decimals, the number of digits is determined by the data passed from the database. Trailing zeros are not removed, and scientific notation is used.FLOAT
DOUBLEDouble Floating-point numbers. The number of significant digits is determined by the source data type, which is either FLOAT or DOUBLE.
FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUMString A string. SET JSON arrays JSON arrays. For example, ["a","b"]. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte array, displayed in BASE64 encoding by default.
Note:
For the BIT fixed-length type, after the incremental reception of the byte array, the leading zeros will be removed, but this does not apply to full data. Therefore, the BASE64 encoding may appear inconsistent. However, the actual result is consistent, and the decoded result will be the same.DATE Date The date type. The format is YYYY-MM-DD. If the date is invalid, the original string is displayed.TIME Time The time type. The format is HH:mm:ss[.nnnnnnnnn].
The time is displayed with at most nine digits if it is in units smaller than a second. For example, if the time is in milliseconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.DATETIME DateTime Date and time type, excluding time zone information, with the time zone being the one at the time of business application writing. The format is YYYY-MM-DD HH:mm:ss[.nnnnnn].
Time below the second level is displayed with 6 digits by default. If the time below the second level has more than 6 digits and the last three digits are not all 0, a 9-digit value will be displayed.
If it is an invalid time, the original string will be displayed.TIMESTAMP Timestamp Date and time type without time zone information, with the 0 time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnn].
By default, a timestamp with a precision lower than seconds is displayed with 6 digits. If such a timestamp has more than 6 digits and its last three digits are not all 0, it is displayed with 9 digits.
If the value is an invalid timestamp, the original string is displayed.