This topic describes the data formats used in serialization methods and when data is transmitted from OceanBase Database to a message queue system.
Data formats used in serialization methods
When you use OceanBase Migration Service (OMS) Community Edition to synchronize data from the source to a Kafka, DataHub (Blob type), or RocketMQ instance, you can use a serialization method to control the format of the data to be synchronized to the destination. The serialization methods include Default, Canal, Dataworks (V2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Maxwell (supported only when the destination is a Kafka instance).
Default JSON message format
When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the Default serialization method:
{
"prevStruct": { // The image before modification.
"col1": "val1" // The key-value pair that contains the full key value.
},
"postStruct": { // The image after modification.
"col1": "val1" // The key-value pair that contains the full key value.
},
"allMetaData" {
"checkpoint": "STRING", // The current synchronization checkpoint, or the end synchronization timestamp during incremental synchronization, in seconds. It is indicated by using a primary key-value pair during full synchronization.
"record_primary_key": "STRING", // The name of the primary key column. If the primary key has multiple columns. Separate the column names with \u0001.
"record_primary_value" "STRING", // The primary key value. If the primary key has multiple columns. Separate the column names with \u0001.
"source_identity": "STRING", // The source ID. The source ID is a subtopic in the case of incremental synchronization. It is a meaningless sequence in the case of full synchronization.
"dbType": "STRING", // The database type. Valid values: MYSQL, ORACLE, OCEANBASE, OB_IN_ORACLE_MODE, DB2, OB_MYSQL, OB_ORACLE, and DB2_LUW. Note that OCEANBASE, OB_IN_ORACLE_MODE, and DB2 are used by earlier versions of the data transmission service and are compatible with the current version.
"storeDataSequence": LONG, // This field is available only when the value of sequenceEnabled is true in the source.json configuration file in incremental synchronization scenarios. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits.
"table_name": "STRING", // The name of the table that is changed by using an SQL statement.
"db": "STRING", // The name of the database that is changed by using an SQL statement. If the specified database is an OceanBase database, the database name includes the tenant name in the {tenant}.{database} format.
"timestamp": "STRING", // The timestamp in seconds for data modification, which is available only for incremental synchronization.
"uniqueId": "STRING", // The unique identifier of a transaction that is passed from the Store component as a sequence number during incremental migration.
"transId": "STRING", // In OceanBase Database Community Edition earlier than V4.x, transId indicates the ID of a transaction. In a transaction ID, inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the HASH value of the preceding three items. For an incomplete transaction, the transaction ID is null. In OceanBase Database Community Edition V4.x and later versions, transId is specified in the format of "tenant ID_transaction ID", for example, "1002_336195". In MySQL or Oracle databases, transId indicates a checkpoint.
"clusterId": "STRING", // clusterId indicates a cluster ID in OceanBase Database and a thread ID in MySQL databases. It does not exist in Oracle databases.
"ddlType": "STRING", // The DDL operation type. This field is added in OMS Community Edition V4.1.1.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL" // The change type.
}
In DDL records, the key is "ddl", which indicates the column name, and the values are DDL statements.
Beforeimage and afterimage:
prevStruct: The beforeimage of incremental data, which is the data before an SQL statement is executed.postStruct: The afterimage of incremental data, which is the data after an SQL statement is executed.
For the
DELETEtype, onlyprevStructis available. For theINSERTandDDLtypes, onlypostStructis available. For theUPDATEtype, bothprevStructandpostStructare available. For theHEARTBEAT(periodic heartbeat message) type, neitherpostStructnorpostStructis available.clusterIdis described as follows:In OceanBase Database,
ob_org_cluster_idspecifies a session-level cluster ID, which will be persisted to the transaction log. If you specifyob_org_cluster_idwhen you write data, the specified value is applied. Otherwise, the default value is applied. This field is available only for incremental synchronization. For more information, see ob_org_cluster_id.In a MySQL database,
clusterIdindicates a thread ID in a MySQL binlog event. The MySQL server assigns incremented thread IDs to threads on each connection.
Examples:
Example of data insertion (INSERT)
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"int8\u0001int16", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", } }Example of data update (UPDATE)
{ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table_name", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of data deletion (DELETE)
{ "allMetaData":{ "checkpoint":null, "record_primary_key":"int8\u0001int16", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table_name", "db":"tenant.database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col16":1.2222, "col7":9.99999999, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.999999999, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345" }, "recordType":"DELETE", "postStruct":null }Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Canal JSON message format
When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the Canal serialization method:
{
"database": "STRING", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
"sqlType": {
"col1": INTEGER, // Indicates that the INTEGER column type is modified. For more information, see java.sql.Types.
},
"data": [ // The modified data key-value pair. This field exists in only one message.
{
"col1": "val1"
}
],
"pkNames": [ // The primary key column name.
"col1"
],
"old": [ // This field exists only in update messages and indicates the column modified by using an UPDATE statement. It indicates the column value before the modification.
{
"col1": "val1"
}
],
"mysqlType": { // The description of the column type.
"col": "STRING"
},
"type": "STRING", // The modification type.
"table": "STRING", // The name of the table that is changed by using an SQL statement.
"es": LONG, // The modification time, with a millisecond-level timestamp.
"isDdl": BOOLEAN, // Indicates whether it is a DDL statement.
"ts": LONG, // The timestamp written into the destination database.
"sql": "STRING", // It is empty.
}
Examples:
Example of data insertion (
INSERT){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }Example of data update (
UPDATE){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "col":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }Example of data deletion (
DELETE){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }
Dataworks JSON message format
When you synchronize data to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message format is used for the Dataworks serialization method:
{
"version":"2.0", // The protocol version. Only DataWorks 2.0 is supported.
"schema": { // The modified metadata information, with only the column name and column type specified.
"source": {// The source of the modification.
"dbType": "mysql", // The source type.
"dbVersion": "5.7.35", // The database version.
"dbName": "myDatabase", // The database name.
"schema": "mySchema", // The schema name. This field is required in a system with schemas.
"table": "tableName" // The table name.
}
"column": [// The modified data column. This field indicates the updated record content in the target table.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [// This field is required if a primary key or unique key is available. Otherwise, it is optional.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", // Base64 encoding is required for the binary type.
"ts": 1590315269000.123456789 // The timestamp with an integer part of 13 digits and a fractional part of 9 digits.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTE
R/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",// Case sensitive.
"timestamp": {
"eventTime": 1620457659000 // The modification time on the source database, which is a 13-digit timestamp in milliseconds.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "The system change number (SCN) that increments."
},
"extend": { // The extend field that can be used for meeting extension requirements in the future. This field can be left unspecified if no extension is available.
"load_fm":"CIBS", // The source system, for example, "CIBS".
}
}
Heartbeat message of a synchronization task:
{
"version":"2.0", // The protocol version.
"payload": {
"timestamp": {
"eventTime": 1620457659000 // The timestamp of the heartbeat packet.
},
"op": "HEARTBEAT" // Indicates a heartbeat packet.
}
}
Examples:
Example of data insertion (
INSERT){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of data update (
UPDATE){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of data deletion (
DELETE){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000999999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" }, "scn": "null" }, "extend": { } }
SharePlex JSON message format
When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the SharePlex serialization method:
{
"data": { // The modified data key-value pair. If an INSERT or a DELETE statement is executed, the value is the full value. If an UPDATE statement is executed, the value is the modified value.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // The modification time.
"op": "", // The change type. Valid values: ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // The time at which data is written to the destination.
"idx":"STRING", // Indexes of messages in the transaction/Number of index messages. At present, OMS does not measure the total number of transactions. Therefore, this field is invalid.
"size": NUMBER, // The number of messages in the transaction. This field is deprecated.
"seq": "STRING", // The sorting sequence. This field is available only when transactionEnabled is set to true on the source database.
"table": "STRING", // The database and table name in the {database}.{table} format modified by using an SQL statement.
"rowid": "STRING", // The row ID in the {database name.table name}-{primary key value} format. If the primary key has multiple columns, separate the column names with \u0001.
"trans": "STRING", // The transaction ID.
"scn": "STRING", // This field is available only when the value of sequenceEnabled is true in the source.json configuration file during incremental synchronization. Default value: true. This field is used for sorting. A sequence is generated during the synchronization process in the timestamp + sequence number format. The sequence number increments and does not exceed five digits.
},
"key": { // The key value before modification, which is available only for modifications by using an UPDATE statement.
},
"sql": {"ddl": ""} // This field is available only for DDL statements. If op is set to ddl, a DDL statement is written.
}
Examples:
Example of data insertion (
INSERT){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of data update (
UPDATE){ "data":{ "col":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" } }Example of data deletion (
DELETE){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'" } }
DefaultExtendColumnType JSON message format
When you synchronize data to a Kafka, DataHub (Blob type), or RocketMQ instance, the following JSON message format is used for the DefaultExtendColumnType serialization method.
The DefaultExtendColumnType JSON messages are created in the Default format with a __light_type field. This field specifies the data type of fields.
{
"prevStruct": { // The image before modification.
},
"postStruct": { // The image after modification.
"__light_type": {
"col":{ // The field name.
"schemaType":"type" // The value type.
}
}
},
"allMetaData" {},
}
Examples:
Example of data insertion (
INSERT){ "allMetaData":{ "checkpoint":null, "record_primary_key":"int8\u0001int16", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":null, "recordType":"INSERT", "postStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }Example of data update (
UPDATE){ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "database", "timestamp": "1609344671" }, "prevStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world 2020", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }Example of data deletion (
DELETE){ "allMetaData":{ "checkpoint":null, "record_primary_key":"int8\u0001int16", "source_identity":null, "record_primary_value":"3\u0001129", "dbType":"OB_MYSQL", "table_name":"table", "db":"database", "timestamp":"1609344671" }, "prevStruct":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":10223372036854775806, "col6":1.2222, "col7":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col8":"hello world", "col9":"aGVsbG8gd29ybGQ=", "col10":9.99999999999999909326625337248461995470488734032045693707225049331647881341002217023668530611028595157578301758491822824378438792553200763769833775473829862512856683413461939989729065436937279228852476622948659167943435544622149348072943613294167216662821737555414480159115639791276054897201420389770580351533960771506199055664889770260291710977826725024401716523031627390652604144008597950935492433262042405635563993262949691698930975461134804791235994697938405200089317860731205010159117711704697471514344499487123311264707354172378099538737850219826145102366279591379660471881259976727356521602405329789906247763521525981391443887618575275588619928089116905061711975308467857756405810961619074331866883961080943542712559830853980002984826569445431232452392578125E-308, "col11":"2020-11-25", "col12":"00:01:02", "col13":"2020-11-25 00:01:02", "col14":"1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }Example of a DDL operation
alter table connector_test.all_mysql_type_test add column c90 varchar(30) default "test" comment 'test';{ "prevStruct": null, "postStruct": { "ddl": "alter table connector_test.all_mysql_type_test add column c90 varchar(30) default \"test\" comment 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (Blob type), or RocketMQ instance, two JSON message formats are used for the Debezium serialization method. By default, only the struct in the payload field is displayed.
Format applied when both
schemaandpayload{#2f02ea300egch} are available{ "schema":{// Describes information about the payload field. By default, this struct is not displayed. "type": "struct",// struct indicates that the field contains a struct. "optional": false,// Specifies whether the field is required. "fields": [ { "type": "int64",// The type of the field. "optional": false,// Specifies whether the field is required. "field": "ts_ms"// The name of the field. } ... ] }, "payload":{ "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message. "source":{ "version":"" // The OMS version. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // A fixed value, which is OMS. "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization. "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name. "table":"testTab" // The name of the table that is changed by using an SQL statement. "pos":"553132@1668496109" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset]. }, "before":{ // The image before modification. "column":"value" // The key-value pair that contains the full key value. } "after":{ // The image after modification. "column":"value" // The key-value pair that contains the full key value. }, "ts_ms":1668497367188 // The timestamp of data processing. } }Format applied when only
payloadis available{ "payload":{ "op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message. "source":{ "version":"" // The OMS version. "connector":"OB_MYSQL", // The type of the data source. "name":"OMS", // A fixed value, which is OMS. "ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization. "db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name. "table":"testTab" // The name of the table that is changed by using an SQL statement. "pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset]. }, "before":{ // The image before modification. "column":"value" // The key-value pair that contains the full key value. } "after":{ // The image after modification. "column":"value" // The key-value pair that contains the full key value. }, "ts_ms":1668497367188 // The timestamp of data processing. } }
Examples:
Example of data insertion (
INSERT){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }Example of data update (
UPDATE){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }Example of data deletion (
DELETE){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message format is used for the DebeziumFlatten serialization method. Different from the Debezium serialization method, the DebeziumFlatten serialization method does not have the schema or payload section.
{
"op":"c", // The type of data modification. Valid values: c, u, d, and HEARTBEAT. c indicates full insert. u indicates update. d indicates delete. HEARTBEAT indicates a heartbeat message.
"source":{
"version":"" // The OMS version.
"connector":"OB_MYSQL", // The type of the data source.
"name":"OMS", // A fixed value, which is OMS.
"ts_ms":0, // The timestamp in seconds for data modification, which is available only for incremental synchronization.
"db": "test", // The name of the database that is changed by using an SQL statement. For an OceanBase database, you can specify only the database name and omit the tenant name.
"table":"testTab" // The name of the table that is changed by using an SQL statement.
"pos":"553132@16684****" // The log position in the binlog file. The value is in the format of [binlog file name]@[binlog file name offset].
},
"before":{ // The image before modification.
"column":"value" // The key-value pair that contains the full key value.
}
"after":{ // The image after modification.
"column":"value" // The key-value pair that contains the full key value.
},
"ts_ms":1668497367188 // The timestamp of data processing.
}
Examples:
Example of data insertion (
INSERT){ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }Example of data update (
UPDATE){ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }Example of data deletion (
DELETE){ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON message format
DebeziumSmt is a configuration method provided by Debezium for transforming and processing a single message through event flattening Single Message Transform (SMT). When you synchronize data from a MySQL tenant of OceanBase Database to a Kafka, DataHub (Blob type), or RocketMQ instance, only the key:value fields in after are displayed in the JSON message format for the DebeziumSmt serialization method.
The following example shows how to use the Debezium serialization method to update data:
{
"op": "u",
"source": {
"connector":"OB_MYSQL",
"name":"OMS",
...
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
The format of the message in the preceding example is simplified through SMT. The following example shows the JSON message format for the DebeziumSmt serialization method:
{
"field1": "after_value1",
"field2": "after_value2"
}
Examples:
Example of data insertion (
INSERT){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of data update (
UPDATE){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of data deletion (
DELETE){ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Maxwell JSON message format
When you synchronize data to a Kafka instance, the following JSON message format is used for the Maxwell serialization method:
{
"database":"STRING", // The name of the database.
"table":"STRING", // The name of the table.
"type":"STRING", // The type of the operation. Valid values: insert, update, delete, table-create, table-alter, and table-drop.
"ts":LONG, // The timestamp. The fixed value for full migration is 1600000000L.
"xid":"STRING", // In OceanBase Database Community Edition earlier than V4.x, xid indicates the ID of a transaction. In a transaction ID, inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the HASH value of the preceding three items. For an incomplete transaction, the transaction ID is null. In OceanBase Database Community Edition V4.x and later versions, xid is specified in the format of "tenant ID_transaction ID", for example, "1002_336195". In MySQL databases, xid indicates a checkpoint.
"commit":BOOLEAN, // Specifies whether to commit the transaction. The fixed value is true.
"position":"STRING", // The checkpoint, in the format of a timestamp, for example, "1684224949".
"primary_key":OBJECT[], // The primary key value. In a DDL statement, the value of this parameter is null. In a DML statement, the value of this field is OBJECT[], for example, [1,"val1"], or [] if no primary key exists.
"primary_key_columns":STRING[], // The primary key column. In a DDL statement, the value of this field is null. In a DML statement, the value of this field is OBJECT[], for example, ["key1","key2"], or [] if no primary key exists. Each listed item corresponds to one primary key value.
"data":{ // The data after a DML change.
"col1":"val1", // The key-value pair.
},
"old":{ // The data before the DML change.
"col1":"val1", // The key-value pair.
}
"sql":"STRING", // The SQL statement for executing the DDL change.
"def":{ // The information about the DDL change objects.
"database":"STRING", // The name of the database.
"table":"STRING", // The name of the table.
"primary-key":OBJECT[], // The primary key value. The fixed value is [].
"columns":STRING[] // The primary key column. The fixed value is [].
},
"uuid":"STRING" // The unique identifier.
"mesh":"prd/gray" // The grayscale identifier. This field is available only after you specify a grayscale identifier.
}
Examples:
Example of data insertion (
INSERT){ "database":"test", "table":"test", "type":"insert", "ts":1684225073, "xid":"1002_336195", "commit":true, "position":"1684225073", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":4.2341, "id":1, "c":"2016-10-21 05:33:37.523000" }, "old":null, "sql":null, "def":null }Example of data update (
UPDATE){ "database":"test", "table":"test", "type":"update", "ts":1684225127, "xid":"1002_336403", "commit":true, "position":"1684225126", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":5.444, "id":1, "c":"2016-10-21 05:33:54.63100" }, "old":{ "m":4.2341, "id":1, "c":"2016-10-21 05:33:37.523000" }, "sql":null, "def":null }Example of data deletion (
DELETE){ "database":"test", "table":"test", "type":"delete", "ts":1684225163, "xid":"1002_336593", "commit":true, "position":"1684225163", "primary_key":[ "1" ], "primary_key_columns":[ "id" ], "data":{ "m":5.444, "id":1, "c":"2016-10-21 05:33:54.631000" }, "old":null, "sql":null, "def":null }
Data formats used when data is transmitted from OceanBase Database to a message queue system
This section lists the possible data format mappings when you synchronize data from OceanBase Database Community Edition to a Kafka, DataHub (Blob type), or RocketMQ instance.
The following table describes the format mappings for OceanBase Database Community Edition when the serialization method is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType.
Data type Mapped-to type Description TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
YEAR
BOOL/BOOLEANLong Integers that are shorter than 64 digits.
Scientific notation is not used for a normal value such as 1000.
For a BOOL or BOOLEAN value, true is converted to 1 and false is converted to 0.DECIMAL
NUMERICBigDecimal Exact decimal numerals and integers longer than 64 digits.
An integer is displayed without the decimal point or a decimal place.
A decimal numeral is displayed based on the data passed into the database instance without removing zeros at the end. Scientific notation is used.FLOAT
DOUBLEDouble Floating-point numbers. The number of significant digits is determined based on the source data type.
If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETString Strings. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte arrays. By default, a byte array is displayed in a Base64-encoded string.
Note
For a fixed-length data type, after an incremental byte array is received, the zeros in the most significant bits are removed. However, such zeros are not removed for a full-byte array. Therefore, the Base64-encoded strings may be inconsistent, However, the actual results obtained after decoding are consistent.DATE Date The DATE type in the format of YYYY-MM-DD. If the value is invalid, the original string is displayed.TIME Time The TIME type in the format of HH:mm:ss[.nnnnnnnnn].
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.DATETIME DateTime The DATETIME type in the format of YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId], including the time zone.
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.TIMESTAMP Timestamp The TIMESTAMP type in the format of [second-level timestamp][.nnnnnnnnn].
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the time is invalid,0000-00-00 00:00:00is displayed.Compared with OMS Community Edition of earlier versions, a project in OMS Community Edition V3.3.0 and later has the following changes:
Note
After you upgrade OMS Community Edition to V3.3.0 or a later version, a project created in OMS Community Edition of an earlier version does not change.
The BIT data type of OceanBase Database Community Edition is converted into a Base64-encoded byte value rather than a value of the LONG data type. The Bytes value can be Base64-decoded to the source value.
The TIMESTAMP data type of OceanBase Database Community Edition is passed in the
sec.millformat. In OMS Community Edition earlier than V3.3.0, the TIMESTAMP data type is passed as integers in thesecformat.The format of other time data types is
YYYY-MM-DD HH:mm:ss[.nnnnnnnnn]in OMS Community Edition V3.3.0 and later versions, andYYYY-MM-DDTHH:mm[:ss]in OMS Community Edition earlier than V3.3.0.
The following table describes data format mappings for OceanBase Database Community Edition when the serialization method is Debezium.
Data type Mapped-to type Description BOOLEAN
BOOLBOOLEAN Valid values: true and false. TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEARLONG An integer that ranges from -2^63^ to 2^63^. BIGINT STRING Strings that display the data in full. FLOAT
DOUBLEDOUBLE Floating-point numbers. DECIMAL
NUMERICSTRING Strings that display the data in full. A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used. BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES Base16-encoded byte arrays. CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETSTRING Strings. TIMESTAMP STRING The timestamp is in the YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z format. The time zone is UTC+0. DATE LONG The number of days since January 1st, 1970. TIME LONG The temporal value, in microseconds, since 00:00:00, excluding the time zone. DATETIME LONG The temporal value, in milliseconds, since 1970-01-01 00:00:00, excluding the time zone. The following table describes data format mappings for OceanBase Database Community Edition when the serialization method is Maxwell.
Data type Mapped-to type Description TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
YEAR
BOOL/BOOLEANLong Integers that are shorter than 64 digits.
Scientific notation is not used for a normal value such as 1000.
For a BOOL or BOOLEAN value, true is converted to 1 and false is converted to 0.DECIMAL
NUMERICBigDecimal Exact decimal numerals and integers longer than 64 digits.
An integer is displayed without the decimal point or a decimal place.
A decimal numeral is displayed based on the data passed into the database instance without removing zeros at the end. Scientific notation is used.FLOAT
DOUBLEDouble Floating-point numbers. The number of significant digits is determined based on the source data type.
If the source data type is FLOAT, the converted value has 7 significant digits. If the source data type is DOUBLE, the converted value has 16 significant digits.CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUMString Strings. SET JSON arrays JSON arrays, for example, ["a","b"]. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte arrays. By default, a byte array is displayed in a Base64-encoded string.
Note
For a fixed-length data type, after an incremental byte array is received, the zeros in the most significant bits are removed. However, such zeros are not removed for a full-byte array. Therefore, the Base64-encoded strings may be inconsistent, However, the actual results obtained after decoding are consistent.DATE Date The DATE type in the format of YYYY-MM-DD. If the value is invalid, the original string is displayed.TIME Time The TIME type in the format of HH:mm:ss[.nnnnnnnnn].
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If the value is invalid, the original string is displayed.DATETIME DateTime The DATETIME type, excluding the time zone. The time zone is specified when data is written into the business application. The value of this field is in the format of YYYY-MM-DD HH:mm:ss[.nnnnnn].
By default, the fractional seconds part contains 6 digits. If the fractional seconds part contains more than 6 digits and the last 3 digits are not all zeros, 9 digits are displayed.
If the value is invalid, the original string is displayed.TIMESTAMP Timestamp The DATETIME type, excluding the time zone. The time zone is UTC+0. The value of this field is in the format of YYYY-MM-DD HH:mm:ss[.nnnnnn].
By default, the fractional seconds part contains 6 digits. If the fractional seconds part contains more than 6 digits and the last 3 digits are not all zeros, 9 digits are displayed.
If the value is invalid, the original string is displayed.