This topic describes the data formats of serialized data and data transmitted from a database to a text protocol.
Serialization formats
When you use the data migration service to migrate data from the source to a Kafka instance, you can control the message format of the migrated data by specifying the serialization method. The available serialization methods are Default, Canal, DataWorks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
Note
At present, only OceanBase Database in MySQL compatible mode supports the Debezium, DebeziumFlatten, and DebeziumSmt serialization methods.
Default JSON message format
When data is migrated to a Kafka instance, Default uses the following JSON message format for serialization.
{
"prevStruct": { // The before image of the changed data.
"col1": "val1" // Key-value pairs, including all keys and values.
},
"postStruct": { // The after image of the changed data.
"col1": "val1" // Key-value pairs, including all keys and values.
},
"allMetaData"{
"checkpoint": "STRING", // The current synchronization point. In incremental synchronization, it represents the timestamp (in seconds) to which the data is synchronized. In full migration, it represents the primary key value pairs.
"record_primary_key": "STRING", // The name of the primary key column. If there are multiple columns, they are separated with \u0001.
"record_primary_value": "STRING", // The primary key value. If there are multiple columns, they are separated with \u0001.
"source_identity": "STRING", // The identifier of the source. It is the subtopic in incremental synchronization and an irrelevant sequence number in full migration.
"dbType": "STRING", // The type of the database.
"storeDataSequence": "LONG", // This field exists only if sequenceEnabled=true (the default value) is configured in source.json for incremental synchronization. It is used for sorting, and its value is generated within a synchronization process in the format of {timestamp}{incremental number}, where {incremental number} does not exceed five digits.
"table_name": "STRING", // The name of the table on which the SQL change is performed.
"db": "STRING", // The name of the database on which the SQL change is performed. For OceanBase Database, the tenant name and database name are included, in the format of {tenant}.{database}.
"timestamp": "STRING", // The timestamp (in seconds) of data change. It is applicable only to incremental synchronization.
"uniqueId": "STRING", // The transaction sequence number transmitted by the store, which is used to identify incremental synchronization.
"transId": "STRING", // In OceanBase Database, it represents the transaction ID (inc is an auto-increment number, addr is the address of the coordinator, ts is the time when the transaction ID is generated, and hash is the hash value of inc, addr, and ts). If the transaction is incomplete, it is null.
"clusterId": "STRING", // In OceanBase Database, it represents the cluster ID.
"ddlType": "STRING", // The DDL specific type.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW" // The change type.
}
In a DDL message, the value of the "ddl" field, which is the only key in the message, is the DDL statement.
The
prevStructandpostStructfields are described as follows:prevStruct: the before image of the changed data, namely, the data before SQL execution.postStruct: the after image of the changed data, namely, the data after SQL execution.
A
DELETEmessage contains only theprevStructfield. AnINSERTorDDLmessage contains only thepostStructfield. AnUPDATEmessage contains both theprevStructandpostStructfields. AHEARTBEAT(periodic heartbeat) message does not contain thepostStructorprevStructfield.The
clusterIdfield is described as follows:In OceanBase Database, the
ob_org_cluster_idfield is used to set the session-level CLUSTER_ID of OceanBase Database. The value is persisted in the transaction log. If you set theob_org_cluster_idfield when you write data, the set value takes precedence. Otherwise, the default value is used. This field is available only for incremental synchronization. For more information, see ob_org_cluster_id.
Here are some data examples:
Example of data inserted (
INSERToperation){ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }When
recordTypeisROW, the data is full data and the format is the same as forINSERTdata.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of data updated (
UPDATEoperation){ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of data deleted (
DELETEoperation){ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col16": 1.2222, "col7": 9.99999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" }, "recordType": "DELETE", "postStruct": null }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Canal JSON message format
When data is migrated to a Kafka instance, Canal uses the following JSON message format for serialization.
{
"database": "STRING", // The name of the database where the change occurred. In OceanBase Database, only the database name is provided without the tenant name.
"sqlType": {
"col1": "INTEGER", // The types of changed columns. The reference standard is java.sql.Types.
},
"data": [ // The key-value pairs of the changed data. Currently, only one message is generated.
{
"col1": "val1"
}
],
"pkNames": [ // The names of primary key columns.
"col1"
],
"old": [ // Present only in UPDATE messages. The names of columns changed by the UPDATE statement, namely, the values of columns before the change.
{
"col1": "val1"
}
],
"mysqlType": { // The descriptions of column types.
"col": "STRING"
},
"type": "STRING", // The change type.
"table": "STRING", // The name of the table where the change occurred.
"es": "LONG", // The timestamp of the change in milliseconds.
"isDdl": "BOOLEAN", // Indicates whether the message is generated from a DDL event.
"ts": "LONG", // The timestamp when the message is written to the target.
"sql": "STRING" // Currently empty.
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.999999999999999093266253, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }Example of data updated (
UPDATEoperation){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.999999999999999093266253372484, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990932662, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "string":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }Example of data deleted (
DELETEoperation){ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }
DataWorks JSON message format
When data is migrated to a Kafka instance, DataWorks uses the following JSON message format for serialization.
{
"version":"2.0", // The protocol version. Currently, only DataWorks 2.0 is supported.
"schema": { // The metadata of changes. Only the column names and types are specified.
"source": {// The source of changes.
"dbType": "mysql", // The data source type.
"dbVersion": "5.7.35", // The database version.
"dbName": "myDatabase", // The database name.
"schema": "mySchema", // The schema name. It is required in a database system that supports schemas.
"table": "tableName" // The table name.
},
"column": [// The changed data columns. The records of the target table are updated based on the changed data.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [// The primary keys or unique keys are required if any. Otherwise, this field can be left empty.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", // If the data type is binary, the data is Base64-encoded.
"ts": 1590315269000.123456789 // The timestamp. The integer part is 13 digits long and the fractional part is 9 digits long.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",// Case-sensitive.
"timestamp": {
"eventTime": 1620457659000 // Timestamp (13-digit, millisecond precision) when change occurred in source database.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "An auto-increment ID"
},
"extend": { // The extend field is reserved for future use. If there is no future use, this field can be ignored.
"load_fm":"CIBS" // The source system that records originate from. For example, "CIBS".
}
}
A heartbeat message for the migration task is sent as follows:
{
"version": "2.0", // The protocol version.
"payload": {
"timestamp": {
"eventTime": 1620457659000 // The timestamp when the heartbeat message is sent.
},
"op": "HEARTBEAT" // Identifies the heartbeat message.
}
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of data updated (
UPDATEoperation){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.000000000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.00000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Example of data deleted (
DELETEoperation){ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "scn": "null" }, "extend": {} }
SharePlex JSON message format
When data is migrated to a Kafka instance, SharePlex uses the following JSON message format for serialization.
{
"data": { // The key-value pairs of changed data. For INSERT/DELETE operations, all values are included. For UPDATE operations, only changed values are included.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // The time when the change occurred.
"op": "", // The change type. Valid values: ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // The time when the data was written to the target.
"idx": "STRING", // The index of the message in the transaction or the total number of messages in the transaction. This field is deprecated.
"size": "NUMBER", // The number of messages in the transaction. This field is deprecated.
"seq": "STRING", // The sequence number. This field is returned only when transactionEnabled is set to true in the source.
"table": "STRING", // The name of the changed table in the SQL database. The format is {database}.{table}.
"rowid": "STRING", // {source_table_name}-{primary_key_values_separated_by_\u0001}
"trans": "STRING", // The transaction ID.
"scn": "STRING" // This field exists only if sequenceEnabled=true (the default value) is configured in source.json for incremental synchronization. It is used for sorting, and its value is generated within a synchronization process in the format of {timestamp}{incremental number}, where {incremental number} does not exceed five digits.
},
"key": { // This field is returned only for UPDATE operations. It indicates the values of the data before the change.
},
"sql": {
"ddl": ""
} // This field is returned only for DDL operations. If op = ddl, the DDL statement is written.
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.999999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of data updated (
UPDATEoperation){ "data":{ "string":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" } }Example of data deleted (
DELETEoperation){ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" comment 'test'" } }
DefaultExtendColumnType JSON message format
When data is migrated to a Kafka instance, DefaultExtendColumnType uses the following JSON message format for serialization.
The DefaultExtendColumnType JSON message format adds a field __light_type in the image to indicate the data type of the field, based on the DEFAULT format.
{
"prevStruct": { // The before image of the changed data.
},
"postStruct": { // The after image of the changed data.
"__light_type": {
"col": { // The field name.
"schemaType": "type" // The value type.
}
}
},
"allMetaData": {
}
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049338, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }When
recordTypeisROW, the data is full data and the format is the same as forINSERTdata.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": null, "transId": null, "clusterId": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": null }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }Example of data updated (
UPDATEoperation){ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103909342127, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }Example of data deleted (
DELETEoperation){ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId": "{hash:123456, inc:1234, addr:\"1.2.3.4:2883\", t:123456}", "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON message format
When data is migrated from an OceanBase database in MySQL compatible mode to a Kafka instance, Debezium uses the following JSON message formats for serialization. There are two parts in total. By default, only the structure of payload is displayed.
The
schemaandpayloadparts are both present.{ "schema": { // Describes the information of the payload field. The default value is an empty object. "type": "struct", // struct indicates that this field contains a nested structure. "optional": false, // Specifies whether this field is required. "fields": [ { "type": "int64", // The field type. "optional": false, // Specifies whether this field is required. "field": "ts_ms" // The field name. } ] }, "payload": { "op": "c", // The type of the data change. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", // The version of the data migration service. "connector": "OB_MYSQL", // The type of the data source. "name": "OMS", // A fixed value, which is OMS. "ts_ms": 0, // The timestamp of data change in seconds. This field is present only in incremental synchronization. "db": "test", // The name of the database to which the changed data belongs. In OceanBase Database, only the database name is provided without the tenant name. "table": "testTab", // The name of the table to which the changed data belongs. "pos": "553132@1668496109" // The position in the binlog file in the format of [binlog file name]@[binlog file offset] }, "before": { // The before image of the changed data. "column": "value" // The key-value pairs, which contain complete key-value data. }, "after": { // The after image of the changed data. "column": "value" // The key-value pairs, which contain complete key-value data. }, "ts_ms": 1668497367188 // The timestamp when the data is processed. } }Only the
payloadpart is present.{ "payload": { "op": "c", // The type of the data change. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", // The version of the data migration service. "connector": "OB_MYSQL", // The type of the data source. "name": "OMS", // A fixed value, which is OMS. "ts_ms": 0, // The timestamp of data change in seconds. This field is present only in incremental synchronization. "db": "test", // The name of the database to which the changed data belongs. In OceanBase Database, only the database name is provided without the tenant name. "table": "testTab", // The name of the table to which the changed data belongs. "pos": "553132@16684****" // The position in the binlog file in the format of [binlog file name]@[binlog file offset]. }, "before": { // The before image of the changed data. "column": "value" // The key-value pairs, which contain complete key-value data. }, "after": { // The after image of the changed data. "column": "value" // The key-value pairs, which contain complete key-value data. }, "ts_ms": 1668497367188 // The timestamp when the data is processed. } }
Here are some data examples:
Example of data inserted (
INSERToperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }Example of data updated (
UPDATEoperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"6900000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }Example of data deleted (
DELETEoperation){ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON message format
When data is migrated from an OceanBase database in MySQL compatible mode to a Kafka instance, DebeziumFlatten uses the following JSON message format for serialization. Unlike the format of Debezium, this format does not include schema and payload.
{
"op": "c", // The type of the data change. Valid values: c (insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
"source": {
"version": "", // The version of the data migration service.
"connector": "OB_MYSQL", // The type of the data source.
"name": "OMS", // A fixed value, which is OMS.
"ts_ms": 0, // The timestamp of data change in seconds. This field is present only in incremental synchronization.
"db": "test", // The name of the database to which the changed data belongs. In OceanBase Database, only the database name is provided without the tenant name.
"table": "testTab", // The name of the table to which the changed data belongs.
"pos": "553132@16684****" // The position in the binlog file in the format of [binlog file name]@[binlog file offset].
},
"before": { // The before image of the changed data.
"column": "value" // The key-value pairs, which contain complete key-value data.
},
"after": { // The after image of the changed data.
"column": "value" // The key-value pairs, which contain complete key-value data.
},
"ts_ms": 1668497367188 // The timestamp when the data is processed.
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }Example of data updated (
UPDATEoperation){ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }Example of data deleted (
DELETEoperation){ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON message format
DebeziumSmt is a configuration method provided by Debezium. It transforms and processes a single message in an event flattened way by using single message transformation (SMT). When data is migrated from an OceanBase database in MySQL compatible mode to a Kafka instance, only the key:value pairs in the after section are displayed in the JSON message format for DebeziumSmt.
The following example shows how to use the Debezium serialization method to update data:
{
"op": "u",
"source": {
"connector": "OB_MYSQL",
"name": "OMS"
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
After SMT processes the preceding example, the message format is simplified. That is, when the serialization method is set to DebeziumSmt, the JSON message format is as follows:
{
"field1": "after_value1",
"field2": "after_value2"
}
Here are some data examples:
Example of data inserted (
INSERToperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of data updated (
UPDATEoperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of data deleted (
DELETEoperation){ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Avro JSON message format
When data is migrated from an OceanBase database in MySQL compatible mode to a Kafka instance, Avro uses the following JSON message format for serialization.
Full migration
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // The timestamp of the safe point. "sourcePosition": "", // No position information in full migration. "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // A fixed value, which is MySQL. "version": "OBMySQL" // A fixed value, which is OBMySQL. }, "operation": "INIT", // The operation type is INIT in full migration. "objectName": "test***", "processTimestamps": [ 1702371565238 ], // Contains only the delivery time. "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Contains only the primary key. }, "fields": [ { "name": "id", "dataTypeNumber": 246 }, // The type of each column. { "name": "bid", "dataTypeNumber": 3 }, { "name": "name", "dataTypeNumber": 15 }, { "name": "address", "dataTypeNumber": 254 } ], "beforeImages": null, // The before image in full migration is empty. "afterImages": [ // The after image. The precision of an INTEGER value or FLOAT value is always 8. The scale of a FLOAT value is always 64. { "value": "1", "precision": 1, "scale": 0 }, { "precision": 8, "value": "11" }, { "charset": "utf8mb4", "value": { "bytes": "yyy" } }, null ] }Incremental DML synchronization
Example of data inserted (
INSERToperation){ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // A checkpoint for OceanBase Database in MySQL compatible mode. "safeSourcePosition": "1702369080", // A checkpoint for OceanBase Database in MySQL compatible mode. "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // The before image of INSERT is empty. "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }Example of data updated (
UPDATEoperation){ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // UPDATE has both the before image and after image. {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }Example of data deleted (
DELETEoperation){ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // The after image of DELETE is empty. }
Incremental DDL synchronization
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [1702369794543], "tags": {}, "fields": null, // No fields or beforeImages for incremental synchronization of DDL. "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The STRING-type afterImages contain DDL statements. }
Format of data transmitted from a database to a text protocol
When you migrate data from an OceanBase database to a Kafka instance:
If the serialization method is Default, Canal, DataWorks (version 2.0 supported), SharePlex, or DefaultExtendColumnType, the data type mappings for OceanBase Database in MySQL and Oracle compatible modes are described as follows:
OceanBase Database in MySQL compatible mode
Data type Mapped-to type Description TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEANLong Integer types with fewer than 64 bits.
Regular numbers, such as 1000, are displayed without scientific notation.
For BOOL/BOOLEAN: true = 1, false = 0.DECIMAL
NUMERICBigDecimal Exact decimal numeric types and integer types with more than 64 bits. An integer is displayed without the decimal point or a decimal place. A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used. FLOAT
DOUBLEDouble Floating-point numbers.
Precision follows the source type:
FLOAT (7 significant digits)
DOUBLE (16 significant digits)CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETString Strings. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes Byte arrays. By default, a byte array is displayed as a Base64-encoded string.
Note
For fixed-length BIT types, incremental synchronization trims leading zeros of byte arrays while full migration does not. Therefore, the displayed Base64-encoded strings may be inconsistent. However, the actual results obtained after decoding are consistent.DATE Date The date type in the format of YYYY-MM-DD. If a value of this data type is invalid, its original string is displayed.TIME Time The time type in the format of HH:mm:ss[.nnnnnnnnn].
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If a value of this data type is invalid, its original string is displayed.DATETIME DateTime The datetime type in the format of YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId], including the time zone.
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If a value of this data type is invalid, its original string is displayed.TIMESTAMP Timestamp The timestamp type in the format of [timestamp in seconds][.nnnnnnnnn].
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If a value of this data type is invalid,0000-00-00 00:00:00is displayed.OceanBase Database in Oracle compatible mode
Data type Mapped-to type Description INTEGER Long Integer types with fewer than 64 bits.
Regular numbers, such as 1000, are displayed without scientific notation.NUMBER
FLOATBigDecimal Exact decimal numeric types and integer types with more than 64 bits. BINARY_FLOAT BINARY_DOUBLE Double Floating-point numbers.
Precision follows the source type:
FLOAT (7 significant digits)
DOUBLE (16 significant digits)VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWIDString Strings. BLOB
BFILE
RAWBytes Byte arrays.
By default, a byte array is displayed as a Base64-encoded string.DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONEDateTime The datetime type in the format of YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId], including the time zone.
The fractional seconds part can contain a maximum of 9 digits. Zeros at the end of the fractional seconds part are not displayed. If a value of this data type is invalid, its original string is displayed.
If the serialization method is Debezium, the data type mappings for OceanBase databases in MySQL compatible mode are described as follows:
Notice
When you migrate data from an OceanBase database in Oracle compatible mode to a Kafka instance, you cannot set the serialization method to Debezium.
Data type Mapped-to type Description BOOLEAN
BOOLBOOLEAN Valid values: true and false. TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEARLONG Integers in the range of -2^63^ to 2^63^. BIGINT STRING Strings that display the data in full. FLOAT
DOUBLEDOUBLE Floating-point numbers. DECIMAL
NUMERICSTRING Strings that display the data in full. A decimal numeral is displayed based on the data passed to the database instance without removing zeros at the end. Scientific notation is used. BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES Base16-encoded byte arrays. CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETSTRING Strings. TIMESTAMP STRING The timestamp is in the format of YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z. The time zone is UTC+0.DATE LONG The number of days since January 1st, 1970. TIME LONG The temporal value, in microseconds, since 00:00:00, excluding the time zone. DATETIME LONG The temporal value, in milliseconds, since 1970-01-01 00:00:00, excluding the time zone. If the serialization method is Avro, the data type mappings for OceanBase databases in MySQL compatible mode are described as follows:
Notice
The Avro serialization method is supported only when you migrate data from an OceanBase database in MySQL compatible mode to a Kafka instance.
Data type Mapped-to type TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BITINTEGER FLOAT
DOUBLEFLOAT DECIMAL
NUMERICDECIMAL VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXTCHARACTER BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOBBinaryObject TIMESTAMP TimestampObject
Note
For full migration and incremental synchronization, a value of the TIMESTAMP type will be converted into a timestamp.-9223372022400Lis the invalid time.
Except for the invalid time, you can call theInstant.ofEpochSecond(ts, nanos)method of Java to obtain the correct wall clock time.DATE
TIME
DATETIME
YEARDATETIME JSON
ENUM
SETTextObject GEOMETRY TextGeometry
Note
At present, the data migration service uses the EWKT format for transparent data transmission. Therefore, the mapped-to type is TextGeometry.