This topic describes the data formats of serialized data and data transmitted from a database to the text protocol.
Serialization formats
When you use OceanBase Migration Service (OMS) to synchronize data from a source database to a Kafka, DataHub (BLOB type), or RocketMQ topic, you can control the serialization format of the synchronized data by specifying the serialization format. The available serialization formats are Default, Canal, DataWorks (V2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
Note
Currently, only MySQL compatible mode of OceanBase Database support the
Debezium,DebeziumFlatten, andDebeziumSmtserialization formats.Currently, the
Avroserialization format is supported only for data synchronization from a MySQL compatible mode of OceanBase Database to a Kafka topic.
Default JSON message format
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the default serialization format is JSON and the JSON message contains the following fields.
{
"prevStruct": { // The pre-image of the changed data.
"col1": "val1" // Key-value pairs, including all keys and values.
},
"postStruct": { // The post-image of the changed data.
"col1": "val1" // Key-value pairs, including all keys and values.
},
"allMetaData"{
"checkpoint": "STRING", // The current synchronization point. In incremental synchronization, it represents the timestamp (in seconds) to which the data is synchronized. In full data synchronization, it represents the primary key value pairs.
"record_primary_key": "STRING", // The name of the primary key column. If there are multiple columns, they are separated with \u0001.
"record_primary_value": "STRING", // The primary key value. If there are multiple columns, they are separated with \u0001.
"source_identity": "STRING", // The identifier of the source. It is the subtopic in incremental synchronization and an irrelevant sequence number in full data synchronization.
"dbType": "STRING", // The type of the database. It can be MYSQL, Oracle, OCEANBASE (legacy mode, compatible use), OB_IN_ORACLE_MODE (legacy mode, compatible use), DB2 (legacy mode, compatible use), OB_MYSQL, OB_ORACLE, or DB2 LUW.
"storeDataSequence": "LONG", // The sequence number for sorting. By default, it is set to true in the source.json configuration file for incremental synchronization. It is not applicable for full data synchronization. The sequence number is generated based on the following rule: timestamp + incremental number not exceeding five digits. It is in the format of {timestamp}{incremental number}.
"table_name": "STRING", // The name of the table on which the SQL change is performed.
"db": "STRING", // The name of the database on which the SQL change is performed. For OceanBase Database, the tenant name and database name are included, in the format of {tenant}.{database}.
"timestamp": "STRING", // The timestamp (in seconds) of data change. It is applicable only to incremental synchronization.
"uniqueId": "STRING", // The transaction sequence number transmitted by the store, which is used to identify incremental synchronization.
"transId": "STRING", // In OceanBase Database of a version earlier than V4.x, it indicates the transaction ID (inc is an auto-increment number, addr is the address of the coordinator, and ts is the time when the transaction ID is generated. hash is the hash value of the preceding three). If the transaction is incomplete, it is null. For example, "transId":"{hash:614827533592190202, inc:1384316437, addr:\"xxx.xxx.xxx.xxx:2882\", t:1756190533588977}". In OceanBase Database V4.x and later, it indicates ${tenant ID}_${transaction ID}. For example, "transId":"1xxx_3152xxx".
"clusterId": "STRING", // In OceanBase Database, it represents the cluster ID. In MySQL Database, it represents the thread ID. It is not applicable in Oracle Database.
"ddlType": "STRING", // The DDL specific type. It is supported from OMS V4.0.1.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW" // The change type.
}
In a DDL message, the value of the "ddl" field, which is the only field in the message, is the DDL statement.
The following describes the
prevStructandpostStructfields:prevStruct: the pre-image of the changed data, namely, the data before SQL execution.postStruct: the post-image of the changed data, namely, the data after SQL execution.
A
DELETEmessage contains only theprevStructfield. AnINSERTorDDLmessage contains only thepostStructfield. AnUPDATEmessage contains both theprevStructandpostStructfields. AHEARTBEAT(periodic heartbeat) message does not contain thepostStructorprevStructfield.The
clusterIdfield is described as follows:In OceanBase Database, the
ob_org_cluster_idfield is used to set the session-level CLUSTER_ID of OceanBase Database. The value is persisted in the transaction log. If you set theob_org_cluster_idfield when you write data, the set value takes precedence. Otherwise, the default value is used. TheclusterIdfield is applicable only to incremental synchronization. For more information, see ob_org_cluster_id.In MySQL Database, the
clusterIdfield indicates the thread ID in the MySQL Binlog Event. The MySQL server assigns an increasing thread ID to each thread on the connection.
Here are some data examples:
Example of data inserted
{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }When
recordTypeisROW, the data is full data and the format is the same as forINSERTdata.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of data updated
{ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of data deleted
{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col16": 1.2222, "col7": 9.99999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" }, "recordType": "DELETE", "postStruct": null }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
JSON format of Canal messages
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, Canal uses the following JSON message format for serialization.
{
"database": "STRING", // The name of the database where the change occurred. In OceanBase Database, only the database name is provided without the tenant name.
"sqlType": {
"col1": "INTEGER", // The types of changed columns. The reference standard is java.sql.Types.
},
"data": [ // The key-value pairs of the changed data. Currently, only one message is generated.
{
"col1": "val1"
}
],
"pkNames": [ // The names of primary key columns
"col1"
],
"old": [ // Present only in UPDATE messages. The names of columns changed by the UPDATE statement, namely, the values of columns before the change.
{
"col1": "val1"
}
],
"mysqlType": { // The descriptions of column types.
"col": "STRING"
},
"type": "STRING", // The change type.
"table": "STRING", // The name of the table where the change occurred.
"es": "LONG", // The timestamp of the change in milliseconds.
"isDdl": "BOOLEAN", // Whether the message is generated from a DDL event.
"ts": "LONG", // The timestamp when the message is written to the target.
"sql": "STRING" // Currently empty.
}
Here are some examples:
An example of an
INSERTmessage{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.999999999999999093266253, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }An example of an
UPDATEmessage{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.999999999999999093266253372484, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990932662, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "string":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }An example of a
DELETEmessage{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }A DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }
JSON message format of DataWorks
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, DataWorks serializes the data using the following JSON message format.
{
"version":"2.0", //The protocol version. Currently, only DataWorks 2.0 is supported.
"schema": { //The metadata of changes. Only the column names and types are specified.
"source": {//The source of changes.
"dbType": "mysql", //The data source type.
"dbVersion": "5.7.35", //The database version.
"dbName": "myDatabase", //The database name.
"schema": "mySchema", //The schema name. It is required in a database system that supports schemas.
"table": "tableName" //The table name.
},
"column": [//The changed data columns. The records of the target table are updated based on the changed data.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [//The primary keys or unique keys are required if any. Otherwise, this field can be left empty.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", //If the data type is binary, the data is Base64-encoded.
"ts": 1590315269000.123456789 //The timestamp. The integer part is 13 digits long and the fractional part is 9 digits long.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",//Case-sensitive
"timestamp": {
"eventTime": 1620457659000 // Timestamp (13-digit, millisecond precision) when change occurred in source database.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "An auto-increment ID"
},
"extend": { //The extend field is reserved for future use. If there is no future use, this field can be ignored.
"load_fm":"CIBS" //The source system that records originate from. For example, "CIBS"
}
}
A heartbeat message is sent as follows:
{
"version": "2.0", //The protocol version.
"payload": {
"timestamp": {
"eventTime": 1620457659000 //The timestamp when the heartbeat message is sent.
},
"op": "HEARTBEAT" //Identifies the heartbeat message.
}
}
Here are some examples:
An
INSERToperation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }An
UPDATEoperation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.000000000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.00000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }A
DELETEoperation{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }A DDL operation
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "scn": "null" }, "extend": {} }
JSON format of SharePlex messages
When data is synchronized to Kafka, DataHub (for BLOB data), or RocketMQ, SharePlex uses the following JSON message format for serialization.
{
"data": { // The key-value pairs of changed data. For INSERT / DELETE operations, all values are included. For UPDATE operations, only changed values are included.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // The time when the change occurred.
"op": "", // The change type. Valid values: ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // The time when the data was written to the target.
"idx": "STRING", //The index of the message in the transaction or the total number of messages in the transaction. This parameter is deprecated.
"size": "NUMBER", //The number of messages in the transaction. This parameter is deprecated.
"seq": "STRING", // The sequence number. This parameter is returned only when transactionEnabled is set to true in the source.
"table": "STRING", // The name of the changed table in the SQL database. The format is {database}.{table}.
"rowid": "STRING", // {source_table_name}-{primary_key_values_separated_by_\u0001}
"trans": "STRING", // The transaction ID.
"scn": "STRING" // This parameter is returned only in incremental synchronization. It is included in the source.json configuration file with sequenceEnabled set to true. The default value is true. It is used for sorting. The generation rule is that the timestamp + an incremental number not exceeding five digits are returned.
},
"key": { // This parameter is returned only for UPDATE operations. It indicates the values of the data before the change.
},
"sql": {
"ddl": ""
} // This parameter is returned only for DDL operations. If op = ddl, the DDL statement is written.
}
Here are some data examples:
Example of data inserted (INSERT operation)
{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.999999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of data updated (UPDATE operation)
{ "data":{ "string":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" } }Example of data deleted (DELETE operation)
{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" comment 'test'" } }
DefaultExtendColumnType JSON message format
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the DefaultExtendColumnType serialization format uses the following JSON message format.
The DefaultExtendColumnType JSON message format adds a field __light_type in the image to indicate the data type of the field, based on the DEFAULT format.
{
"prevStruct": { // The image before the change
},
"postStruct": { // The image after the change
"__light_type": {
"col": { // The name of the field
"schemaType": "type" // The type of the value
}
}
},
"allMetaData": {
}
}
Here is an example:
Example of
INSERT(insert) data{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049338, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }When
recordTypeisROW, it indicates that the data is fully transmitted. The data format is the same as that ofINSERT.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": null, "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": null }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }Example of
UPDATE(update) data{ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103909342127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }Example of
DELETE(delete) data{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database of a version earlier than V4.x, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON message format
When you synchronize data from a MySQL-compatible tenant of OceanBase Database to a Kafka, DataHub (blob type), or RocketMQ instance, the following JSON message formats can be used for the Debezium serialization method. By default, only the structure of payload is displayed.
The
schemaandpayloadparts are both present.{ "schema": { // describes the information of the `payload` field. The default value is an empty object. "type": "struct", // struct indicates that the field contains a nested structure. "optional": false, // specifies whether the field is mandatory. "fields": [ { "type": "int64", // specifies the type of the field. "optional": false, // specifies whether the field is mandatory. "field": "ts_ms" // specifies the name of the field. } ] }, "payload": { "op": "c", // indicates the type of data modification. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", // the version of OMS. "connector": "OB_MYSQL", // the type of the data source. "name": "OMS", // the fixed value OMS. "ts_ms": 0, // the timestamp of data change in seconds. This field is present only in incremental synchronization. "db": "test", // the name of the database to which the changed data belongs. For OceanBase Database, only the database name is provided without the tenant name in the SQL statement. "table": "testTab", // the name of the table to which the changed data belongs. "pos": "553132@1668496109" // the position of the binlog file in the format of [binlog file name]@[binlog file offset] }, "before": { // the image of the data before the change "column": "value" // the key-value pairs, which contain complete key-value data }, "after": { // the image of the data after the change "column": "value" // the key-value pairs, which contain complete key-value data }, "ts_ms": 1668497367188 // the timestamp when the data is processed } }Only the
payloadpart is present.{ "payload": { "op": "c", // indicates the type of data modification. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", // the version of OMS. "connector": "OB_MYSQL", // the type of the data source. "name": "OMS", // the fixed value OMS. "ts_ms": 0, // the timestamp of data change in seconds. This field is present only in incremental synchronization. "db": "test", // the name of the database to which the changed data belongs. For OceanBase Database, only the database name is provided without the tenant name in the SQL statement. "table": "testTab", // the name of the table to which the changed data belongs. "pos": "553132@16684****" // the position of the binlog file in the format of [binlog file name]@[binlog file offset] }, "before": { // the image of the data before the change "column": "value" // the key-value pairs, which contain complete key-value data }, "after": { // the image of the data after the change "column": "value" // the key-value pairs, which contain complete key-value data }, "ts_ms": 1668497367188 // the timestamp when the data is processed } }
Here are some examples:
An example of data inserted (
INSERToperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }An example of data updated (
UPDATEoperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"6900000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }An example of data deleted (
DELETEoperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON message format
When you synchronize data from a MySQL tenant of OceanBase Database to Kafka, DataHub (BLOB type), and RocketMQ, the JSON message is serialized in the DebeziumFlatten format. Compared with the Debezium format, this format does not include the schema and payload fields.
{
"op": "c", // The data modification type. Valid values include c (full insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
"source": {
"version": "", // The version of OMS.
"connector": "OB_MYSQL", // The type of the data source.
"name": "OMS", // Fixed value OMS.
"ts_ms": 0, // The timestamp of data change in seconds. This field is present only in incremental synchronization.
"db": "test", // The name of the database to which the change is applied. For OceanBase Database, only the database name is provided without the tenant name.
"table": "testTab", // The name of the table to which the change is applied.
"pos": "553132@16684****" // The position of the binlog file in the format of [binlog filename]@[binlog file offset]
},
"before": { // The image of the data before the change.
"column": "value" // The key-value pairs, including full key-value data.
},
"after": { // The image of the data after the change.
"column": "value" // The key-value pairs, including full key-value data.
},
"ts_ms": 1668497367188 // The timestamp when the data is processed.
}
Here is an example:
An example of data inserted (
INSERToperation){ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }An example of data updated (
UPDATEoperation){ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }An example of data deleted (
DELETEoperation){ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON format
DebeziumSmt is a configuration method provided by Debezium. Single Message Transform (SMT) converts and processes a single message in an event flattened way. When you synchronize data from a MySQL-compatible tenant of OceanBase Database to Kafka, DataHub (blob type), or RocketMQ, the following JSON message format is used for the DebeziumSmt serialization method, displaying only key:value fields in the after.
The following example shows how to use the Debezium serialization method to update data:
{
"op": "u",
"source": {
"connector": "OB_MYSQL",
"name": "OMS"
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
After SMT processes the preceding example, the message format is simplified. That is, when the serialization mode is set to DebeziumSmt, the JSON message format is as follows.
{
"field1": "after_value1",
"field2": "after_value2"
}
Here are some examples:
An example of data inserted (
INSERToperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }An example of data updated (
UPDATEoperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }An example of data deleted (
DELETEoperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Avro JSON message format
When data is synchronized from a MySQL tenant of OceanBase Database to a Kafka topic, the data is serialized in the Avro format and transmitted in the following JSON message format.
Full migration
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // The timestamp of the safe point. "sourcePosition": "", // No position information in full migration. "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // Fixed value MySQL. "version": "OBMySQL" // Fixed value OBMySQL. }, "operation": "INIT", // The operation type is INIT in full migration. "objectName": "test***", "processTimestamps": [ 1702371565238 ], // Contains only the delivery time. "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Contains only the primary key. }, "fields": [ { "name": "id", "dataTypeNumber": 246 }, // The type of each column. { "name": "bid", "dataTypeNumber": 3 }, { "name": "name", "dataTypeNumber": 15 }, { "name": "address", "dataTypeNumber": 254 } ], "beforeImages": null, // The image before full migration is empty. "afterImages": [ // The image after full migration. The precision of an INTEGER value is 8, and the scale of a FLOAT value is 64. { "value": "1", "precision": 1, "scale": 0 }, { "precision": 8, "value": "11" }, { "charset": "utf8mb4", "value": { "bytes": "yyy" } }, null ] }Incremental synchronization of DML data
Here is an example of data inserted.
{ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // Checkpoint of OceanBase MySQL compatible mode "safeSourcePosition": "1702369080", // Checkpoint of OceanBase MySQL compatible mode "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // Pre-image is empty before INSERT. "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }Here is an example of data updated.
{ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // UPDATE has both before-image and after-image. {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }Here is an example of data deleted.
{ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // The after-image of DELETE is empty. }
Incremental synchronization of DDL data
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [1702369794543], "tags": {}, "fields": null, // No fields or beforeImages for incremental synchronization of DDL "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The STRING-type afterImages contain DDL statements. }
Format of data transmitted from OceanBase Database to text protocols
When you synchronize data from OceanBase Database to Kafka, DataHub (blob type), or RocketMQ:
If the serialization format is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType, the data types and mappings for MySQL and Oracle compatible tenants of OceanBase Database are described as follows.
OceanBase MySQL compatible mode
Data Type Mapping type Description TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEANLong Integer types below 64-bit.
Regular numbers (e.g., 1000) are displayed without scientific notation.
For BOOL/BOOLEAN: true = 1, false = 0.DECIMAL
NUMERICBigDecimal Exact decimal numeric types and integers above 64-bit. Integer values display without decimal points. Decimal values retain original precision (with trailing zeros) and use scientific notation. FLOAT
DOUBLEDouble Floating-point numbers.
Precision follows source type: FLOAT (7 significant digits), DOUBLE (16 significant digits).CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETString String values. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte arrays displayed in BASE64 encoding by default.
Note: For fixed-length BIT types, incremental sync trims leading zeros (unlike full sync), but decoded results remain consistent.DATE Date Date format: YYYY-MM-DD.
Invalid dates display original strings.TIME Time Time format: HH:mm:ss[.nnnnnnnnn].
Sub-second precision displays up to 9 digits (showing all non-zero values).
Invalid times display original strings.DATETIME DateTime DateTime with timezone: YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
Sub-second rules same as TIME.
Invalid datetimes display original strings.TIMESTAMP Timestamp Format: [Unix timestamp][.nnnnnnnnn].
Sub-second rules same as TIME.
Invalid timestamps display as0000-00-00 00:00:00.OceanBase Oracle compatible mode
Data Type Mapping type Description INTEGER Long Integer types below 64-bit.
Regular numbers (e.g., 1000) without scientific notation.NUMBER
FLOATBigDecimal Exact decimal numeric types and integers exceeding 64-bit. BINARY_FLOAT
BINARY_DOUBLEDouble Floating-point numbers.
Precision follows source type: FLOAT (7 significant digits), DOUBLE (16 significant digits).VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MONTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWIDString String values. BLOB
BFILE
RAWBytes Byte arrays displayed in BASE64 encoding. DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONEDateTime DateTime with timezone: YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
Sub-second precision displays up to 9 digits (showing all non-zero values).
Invalid values display original strings.
If the serialization method is Debezium, the data type mappings for MySQL-compatible tenants of OceanBase Database are described in the following table.
Notice
When you synchronize data from an Oracle-compatible tenant of OceanBase Database to Kafka, DataHub (blob type), or RocketMQ, you cannot set the serialization method to Debezium.
Data type Mapping type Description BOOLEAN
BOOLBOOLEAN The values are true and false. TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEARLONG Integers in the range of -2^63^ to 2^63^. BIGINT STRING The data is displayed in string format. FLOAT
DOUBLEDOUBLE Floating-point numbers. DECIMAL
NUMERICSTRING The data is displayed in string format. For decimal values, shows exact precision as received from the database (preserving trailing zeros) and always uses scientific notation. BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES Arrays of bytes. The data is displayed in base16-encoded format. CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETSTRING Strings. TIMESTAMP STRING The timestamp format is YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z, and the time zone is 0.DATE LONG The number of days since January 1, 1970. TIME LONG The value of time (in microseconds) since 00:00:00, not including the time zone information. DATETIME LONG The number of milliseconds since January 1, 1970, 00:00:00, not including the time zone information. If the serialization format is Avro, the data types and mappings for the MySQL tenant of OceanBase Database are described as follows.
Notice
The serialization format Avro is supported only when you synchronize data from a MySQL tenant of OceanBase Database to Kafka.
Type name Mapping type TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BITINTEGER FLOAT
DOUBLEFLOAT DECIMAL
NUMERICDECIMAL VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXTCHARACTER BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOBBinaryObject TIMESTAMP TimestampObject
Description For a TIMESTAMP type, full and incremental synchronizations both convert the data into a timestamp. Illegal timestamp is-9223372022400L.
Except for the illegal timestamp, you can use theInstant.ofEpochSecond(ts, nanos)method of Java to obtain the correct wall-clock time.DATE
TIME
DATETIME
YEARDATETIME JSON
ENUM
SETTextObject GEOMETRY TextGeometry
Description At present, data transmission uses the EWKT format to transmit geometry data. Therefore, the data is mapped to the TextGeometry type.