This topic describes the serialization methods and data formats for transmitting data from a database to a text protocol.
Format description of serialization methods
When you use OceanBase Migration Service (OMS) to synchronize data from a source to a destination, such as Kafka, DataHub (BLOB type), or RocketMQ, you can specify the serialization method to control the message format of the synchronized data. The supported serialization methods include Default, Canal, DataWorks (V2.0), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
Note
Currently, only OceanBase Database in MySQL compatible mode supports the
Debezium,DebeziumFlatten, andDebeziumSmtserialization methods.Currently, only data from OceanBase Database in MySQL compatible mode can be synchronized to Kafka by using the
Avroserialization method.
Default JSON message format
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the default JSON message format is used for serialization.
{
"prevStruct": { // The previous image of the data.
"col1": "val1" // A key-value pair containing all key-value pairs.
},
"postStruct": { // The current image of the data.
"col1": "val1" // A key-value pair containing all key-value pairs.
},
"allMetaData"{
"checkpoint": "STRING", // The synchronization point. In incremental synchronization, it represents the timestamp (in seconds) to which the data is synchronized. In full synchronization, it represents the primary key-value pairs.
"record_primary_key": "STRING", // The name of the primary key column. If there are multiple columns, they are separated by \u0001.
"record_primary_value": "STRING", // The value of the primary key. If there are multiple columns, they are separated by \u0001.
"source_identity": "STRING", // The source identifier. If it is incremental, it is the subtopic. If it is full, it is an invalid sequence number.
"dbType": "STRING", // The type of the database. Valid values: MYSQL, ORACLE, OCEANBASE (old mode, compatible), OB_IN_ORACLE_MODE (old mode, compatible), DB2 (old mode, compatible), OB_MYSQL, OB_ORACLE, and DB2_LUW.
"storeDataSequence": "LONG", // This field exists only in incremental synchronization and is specified as sequenceEnabled=true in the source.json file. The default value is true. It is used for sorting. The generation rule is a timestamp plus an incrementing number of no more than five digits. The format is {timestamp}{incrementing number}.
"table_name": "STRING", // The name of the table that is changed by using an SQL statement.
"db": "STRING", // The name of the database that is changed by using an SQL statement. If the database is OceanBase Database, it contains the tenant name. The format is {tenant}.{database}.
"timestamp": "STRING", // The timestamp of the data change in seconds. This field exists only in incremental synchronization.
"uniqueId": "STRING", // The transaction sequence number identifier passed down by STORE in incremental synchronization.
"transId": "STRING", // In OceanBase Database V4.x and earlier, this field represents the transaction ID. inc is an auto-incrementing number, addr is the address of the coordinator, ts is the timestamp when the transaction ID is generated, and hash is the hash value of the preceding three values. If the transaction is incomplete, this field is null. For example, "transId":"{hash:614827533592190202, inc:1384316437, addr:\"xxx.xxx.xxx.xxx:2882\", t:1756190533588977}". In OceanBase Database V4.x and later, this field represents the tenant ID and transaction ID. For example, "transId":"1xxx_3152xxx".
"clusterId": "STRING", // In OceanBase Database, this field represents the cluster ID. In MySQL, it represents the thread ID. In Oracle, this field does not exist.
"ddlType": "STRING", // The type of DDL operations. This field is supported in OMS V4.0.1 and later.
},
"recordType": "INSERT/UPDATE/DELETE/HEARTBEAT/DDL/ROW" // The type of the change.
}
In the DDL record, only the key with "ddl" as the column name exists, and the value is the DDL statement.
The previous image and the current image:
prevStruct: The previous image of the incremental data, that is, the data before the SQL statement is executed.postStruct: The current image of the incremental data, that is, the data after the SQL statement is executed.
DELETEonly exists inprevStruct,INSERTandDDLonly exist inpostStruct,UPDATEexists in bothprevStructandpostStruct, andHEARTBEAT(heartbeat messages) does not exist inpostStruct.clusterIdis described as follows:In OceanBase Database,
ob_org_cluster_idis used to set the CLUSTER_ID at the session level. The value is persisted to the transaction log. If you setob_org_cluster_idwhen you write data, the value is used. Otherwise, the default value is used. This field exists only in incremental synchronization. For more information, see ob_org_cluster_id.In MySQL,
clusterIdrepresents the thread ID in the MySQL Binlog Event. MySQL Server assigns an incrementing thread ID to each thread on a connection.
Here are some examples:
Example of INSERT data
{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }If
recordTypeisROW, the data is full data, and the format is the same as that ofINSERT.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "source_identity": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of UPDATE data
{ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", } }Example of DELETE data
{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx27, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database V4.x and earlier, the example is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col16": 1.2222, "col7": 9.99999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345" }, "recordType": "DELETE", "postStruct": null }Example of DDL data
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "allMetaData": { "checkpoint": "1671177057", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177057", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
JSON message format of Canal
When data is synchronized to Kafka, DataHub (BLOB type), and RocketMQ, Canal uses the following JSON message format for serialization.
{
"database": "STRING", // The name of the database to which the SQL statement applies. For OceanBase Database, only the database name is provided, and the tenant name is not required.
"sqlType": {
"col1": "INTEGER", // The type of the column that is changed, which is a numeric value referencing java.sql.Types.
},
"data": [ // The key-value pairs of the data after the change. Currently, only one message exists.
{
"col1": "val1"
}
],
"pkNames": [ // The names of the primary key columns.
"col1"
],
"old": [ // This field exists only in UPDATE messages. It represents the column values before the change in the UPDATE statement.
{
"col1": "val1"
}
],
"mysqlType": { // The column type description.
"col": "STRING"
},
"type": "STRING", // The type of the change.
"table": "STRING", // The name of the table to which the SQL statement applies.
"es": "LONG", // The change time, in milliseconds.
"isDdl": "BOOLEAN", // Indicates whether the change is a DDL operation.
"ts": "LONG", // The timestamp when the data is written to the destination.
"sql": "STRING" // This field is empty.
}
Here are some examples:
INSERT(insert) data example{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.999999999999999093266253, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"INSERT", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618323429026, "sql":"" }UPDATE(update) data example{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world 2020", "col3":1.2222, "col4":9.999999999999999093266253372484, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.9999999999999990932662, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "col1", "col2" ], "old":[ { "string":"hello world" } ], "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"UPDATE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364572908, "sql":"" }DELETE(delete) data example{ "database":"database", "sqlType":{ "col1":93, "col2":12, "col3":6, "col4":8, "col5":5, "col6":92, "col7":4, "col8":-5, "col9":2004, "col10":-6, "col11":91, "col12":3, "col13":-5, "col14":93 }, "data":[ { "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":1.2222, "col4":9.99999999999999909326625, "col5":129, "col6":"00:01:02", "col7":2147483646, "col8":9223372036854775806, "col9":"aGVsbG8gd29ybGQ=", "col10":3, "col11":"2020-11-25", "col12":9.99999999999999909326625, "col13":10223372036854775806, "col14":"1606233662.012345" } ], "pkNames":[ "int8", "int16" ], "old":null, "mysqlType":{ "col1":"datetime", "col2":"varchar", "col3":"float", "col4":"double", "col5":"smallint", "col6":"time", "col7":"int", "col8":"int64", "col9":"blob", "col10":"tinyint", "col11":"date", "col12":"decimal", "col13":"bigint", "col14":"timestamp" }, "type":"DELETE", "table":"table", "es":1609344671000, "isDdl":false, "ts":1618364660278, "sql":"" }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "database": "connector_test", "sqlType": null, "data": null, "pkNames": null, "old": null, "mysqlType": null, "type": "ALTER", "table": "all_mysql_type_test", "es": 1671177209000, "isDdl": true, "ts": 1671177291475, "sql": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }
DataWorks JSON message format
DataWorks uses the following JSON message format for the serialization method for data synchronization to Kafka, DataHub (BLOB type), and RocketMQ:
{
"version":"2.0", //The protocol version. Only DataWorks 2.0 is supported.
"schema": { //Metadata information of changed columns. Only column names and column types are specified.
"source": {//Change source information
"dbType": "mysql", //Data source type
"dbVersion": "5.7.35", //Database version
"dbName": "myDatabase", //The name of the database.
"schema": "mySchema", // The schema name. This parameter is required if a schema exists.
"table": "tableName" // Table name
},
"column": [//The data column information about the modified data.
{
"name": "id",
"type": "bigint"
},
{
"name": "name",
"type": "varchar(20)"
},
{
"name": "mydata",
"type": "binary"
},
{
"name": "ts",
"type": "datetime"
}
],
"pk": [//This field is required if the object has a primary key or unique key, otherwise, it can be omitted.
"pkName1",
"pkName2"
]
},
"payload": {
"before": {
"data":{
"id": 111,
"name":"scooter",
"mydata": "[base64 string]", //Base64 encode if it is a binary type.
"ts": 1590315269000.123456789 // The timestamp consists of 13 integer digits and 9 decimal digits.
}
},
"after": {
"data":{
"id": 222,
"name":"donald",
"mydata": "[base64 string]",
"ts": 1590315269000
}
},
"op":"INSERT/UPDATE/DELETE/HEARTBEAT/TRANSACTION_BEGIN/TRANSACTION_END/CREATE/ALTER/ERASE/QUERY/TRUNCATE/RENAME/CINDEX/DINDEX/GTID/XACOMMIT/XAROLLBACK/...",//case-sensitive
"timestamp": {
"eventTime": 1620457659000 // The time when the change occurred at the source database in 13-digit timestamps with millisecond precision.
},
"ddl": {
"text": "ADD COLUMN ..."
},
"scn": "Auto-incremented ID"
},
"extend": { //extend indicates fields for future expansion. You can leave the field empty if you do not have expansion requirements.
"load_fm":"CIBS" //The name of the source system. For example, "CIBS".
}
}
Synchronization task heartbeat message:
{
"version": "2.0", //Protocol version
"payload": {
"timestamp": {
"eventTime": 1620457659000 // heartbeat timestamp
},
"op": "HEARTBEAT" // Indicates the heartbeat packet.
}
}
Data samples are as follows:
Example of
INSERT(insert) data{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName12" ] }, "payload":{ "before":null, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"INSERT", "timestamp":{ "eventTime":1647581000000, "systemTime":1647581000795, "checkpointTime":1647581000 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Here is an example of how to update data:
{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.000000000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.00000000008125, "col7":10223372036854775806, "col8":1, "col9":"hello world 2020", "col10":"aGVsbG8gd29ybGQ=", "col11":0.000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "op":"UPDATE", "timestamp":{ "eventTime":1647581038000, "systemTime":1647581038674, "checkpointTime":1647581038 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }Here is an example of how to delete data:
{ "version":"2.0", "schema":{ "source":{ "dbType":"ob_mysql", "dbVersion":null, "dbName":"db", "schema":null, "table":"tab" }, "column":[ { "name":"int8", "type":"TINYINT" }, { "name":"int16", "type":"SMALLINT" }, { "name":"int32", "type":"INT" }, { "name":"int64", "type":"INT64" }, { "name":"float32", "type":"FLOAT" }, { "name":"float64", "type":"DOUBLE" }, { "name":"bigInt", "type":"BIGINT" }, { "name":"boolean", "type":"BOOLEAN" }, { "name":"string", "type":"VARCHAR" }, { "name":"bytes", "type":"BLOB" }, { "name":"decimal", "type":"DECIMAL" }, { "name":"localDate", "type":"DATE" }, { "name":"localTime", "type":"TIME" }, { "name":"localDateTime", "type":"DATETIME" }, { "name":"timestamp", "type":"TIMESTAMP" }, { "name":"zonedDateTime", "type":"ZONED_DATETIME" }, { "name":"intervalDayToSecond", "type":"INTERVAL_DAY_TO_SECOND" }, { "name":"intervalYearToMonth", "type":"INTERVAL_YEAR_TO_MONTH" } ], "pk":[ "pkName1", "pkName2" ] }, "payload":{ "before":{ "data":{ "col1":3, "col2":129, "col3":2147483646, "col4":9223372036854775806, "col5":1.2222, "col6":0.0000000000125, "col7":10223372036854775806, "col8":1, "col9":"hello world", "col10":"aGVsbG8gd29ybGQ=", "col11":0.0000000000125, "col12":"2020-11-25", "col13":"00:01:02", "col14":"2020-11-25 00:01:02", "col15":"1606233662.012345", "col16":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col17":"INTERVAL '3' DAY", "col18":"INTERVAL '4' YEAR" } }, "after":null, "op":"DELETE", "timestamp":{ "eventTime":1647581072000, "systemTime":1647581072976, "checkpointTime":1647581072 }, "ddl":null, "scn":"null" }, "extend":{ "load_fm": "test" } }A DDL example:
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "version": "2.0", "schema": { "source": { "dbType": "ob_mysql", "dbVersion": null, "dbName": "connector_test", "schema": null, "table": "all_mysql_type_test" }, "column": null, "pk": null }, "payload": { "before": null, "after": null, "op": "ALTER", "timestamp": { "eventTime": 1671177209000, "systemTime": 1671177291485, "checkpointTime": 1671177200 }, "ddl": { "text": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'" }, "scn": "null" }, "extend": {} }
SharePlex JSON message format
When you synchronize data to Kafka, DataHub (BLOB type), and RocketMQ, SharePlex uses the following JSON message format.
{
"data": { // The key-value pairs of the changed data. For INSERT and DELETE operations, the values are the full values. For UPDATE operations, the values are the changed values.
"col1": "val1"
},
"meta": {
"time": "YYYY-MM-DDTHH:mm:ss", // The time when the change occurred.
"op": "", // The type of change, including ins, upd, del, and ddl.
"posttime": "YYYY-MM-DDTHH:mm:ss", // The time when the data was written to the destination.
"idx": "STRING", // The index of the message in the transaction or the number of indexed messages. This parameter is deprecated.
"size": "NUMBER", // The number of messages in the transaction. This parameter is deprecated.
"seq": "STRING", // The sequence number. This parameter is valid only when transactionEnabled is enabled on the source.
"table": "STRING", // The name of the SQL database table {database}.{table}.
"rowid": "STRING", // The name of the changed database table and the primary key value separated by \u0001.
"trans": "STRING", // The transaction ID.
"scn": "STRING" // This field exists only in incremental scenarios when the source.json configuration file contains sequenceEnabled=true. By default, sequenceEnabled is true. This field is used for sorting. The generation rule is a timestamp plus an incrementing sequence number of up to five digits in a synchronization process.
},
"key": { // This field exists only for UPDATE operations. It represents the values before the change.
},
"sql": {
"ddl": ""
} // This field exists only for DDL operations. If op is ddl, the DDL statement is written to this field.
}
Here are some examples of data:
Example of
INSERT(insert) data{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.999999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:22:00", "op":"ins", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of
UPDATE(update) data{ "data":{ "string":"hello world 2020" }, "meta":{ "posttime":"2020-12-07T13:59:09", "op":"upd", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" }, "key":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999999308, "col17":10223372036854775806, "col18":"1606233662.012345" } }Example of
DELETE(delete) data{ "data":{ "col1":"2020-11-25 00:01:02", "col2":"hello world", "col3":"INTERVAL '3' DAY", "col4":1.2222, "col5":9.9999999999308, "col6":129, "col7":"00:01:02", "col8":1, "col9":"2020-11-25 00:01:02.012345 Asia/Shanghai", "col10":2147483646, "col11":9223372036854775806, "col12":"aGVsbG8gd29ybGQ=", "col13":"INTERVAL '4' YEAR", "col14":3, "col15":"2020-11-25", "col16":9.9999999308, "col17":10223372036854775806, "col18":"1606233662.012345" }, "meta":{ "posttime":"2020-12-07T13:34:10", "op":"del", "size":10, "time":"2020-11-25T00:01:02", "idx":"1/10", "seq":1, "table":"mock_database.mock_table", "rowid":"mock_database.mock_table-3\u0001129", "trans":"shareplex_transaction_id", "scn":"123456789" } }Example of DDL data
ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "data": {}, "meta": { "posttime": "2022-12-16T15:54:51", "op": "ddl", "size": 0, "time": "2022-12-16T15:53:29", "idx": "0/0", "seq": 0, "table": "connector_test.all_mysql_type_test", "rowid": "connector_test.all_mysql_type_test-", "trans": null, "scn": "null" }, "sql": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD COLUMN c90 VARCHAR(30) DEFAULT \"test\" comment 'test'" } }
DefaultExtendColumnType JSON message format
When data is synchronized to Kafka, DataHub (BLOB type), or RocketMQ, the DefaultExtendColumnType serialization method uses the following JSON message format.
The DefaultExtendColumnType JSON message format adds a field __light_type to the DEFAULT format. This field is used to indicate the data type of the field.
{
"prevStruct": { // The image before the change.
},
"postStruct": { // The image after the change.
"__light_type": {
"col": { // The name of the field.
"schemaType": "type" // The type of the value.
}
}
},
"allMetaData": {
}
}
Here are some examples of data:
INSERT(insert) data{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:11039xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": null, "recordType": "INSERT", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.99999999999999909326625337248, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999999909326625337248461995470488734032045693707225049338, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }When
recordTypeisROW, it indicates that the data is fully transmitted, and the format is the same asINSERT.{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": null, "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": null, "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": null }, "prevStruct": null, "recordType": "ROW", "postStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999, "col8": "hello world", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.99999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long":{ "schemaType":"TIMESTAMP" } } } }UPDATE(update) data{ "allMetaData": { "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103909342127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType":"OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType": "UPDATE", "postStruct": { "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type": { "int8": { "schemaType": "TINYINT" }, "int16": { "schemaType": "SMALLINT" }, "int32": { "schemaType": "INT" }, "int64": { "schemaType": "INT64" }, "bigInt": { "schemaType": "BIGINT" }, "float32": { "schemaType": "FLOAT" }, "float64": { "schemaType": "DOUBLE" }, "string": { "schemaType": "VARCHAR" }, "bytes": { "schemaType": "BLOB" }, "decimal": { "schemaType": "DECIMAL" }, "localDate": { "schemaType": "DATE" }, "localTime": { "schemaType": "TIME" }, "localDateTime": { "schemaType": "DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } } }DELETE(delete) data{ "allMetaData":{ "checkpoint": null, "record_primary_key": "int8\u0001int16", "source_identity": null, "uniqueId": "{tid:1103xxxx127, partition_id:0, part_cnt:0},5917,391,0", "transId":"1xxx_3152xxx", // This is an example for OceanBase Database V4.x and later. For OceanBase Database earlier than V4.x, the value is "transId": "{hash:123456, inc:1234, addr:\"xxx.xxx.xxx.xxx:2883\", t:123456}". "clusterId": "123456", "record_primary_value": "3\u0001129", "dbType": "OB_MYSQL", "table_name": "table", "db": "tenant.database", "timestamp": "1609344671" }, "prevStruct":{ "col1": 3, "col2": 129, "col3": 2147483646, "col4": 9223372036854775806, "col5": 10223372036854775806, "col6": 1.2222, "col7": 9.999999999999, "col8": "hello world 2020", "col9": "aGVsbG8gd29ybGQ=", "col10": 9.999999999999, "col11": "2020-11-25", "col12": "00:01:02", "col13": "2020-11-25 00:01:02", "col14": "1606233662.012345", "__light_type":{ "int8":{ "schemaType":"TINYINT" }, "int16":{ "schemaType":"SMALLINT" }, "int32":{ "schemaType":"INT" }, "int64":{ "schemaType":"INT64" }, "bigInt":{ "schemaType":"BIGINT" }, "float32":{ "schemaType":"FLOAT" }, "float64":{ "schemaType":"DOUBLE" }, "string":{ "schemaType":"VARCHAR" }, "bytes":{ "schemaType":"BLOB" }, "decimal":{ "schemaType":"DECIMAL" }, "localDate":{ "schemaType":"DATE" }, "localTime":{ "schemaType":"TIME" }, "localDateTime":{ "schemaType":"DATETIME" }, "timestamp_in_long": { "schemaType": "TIMESTAMP" } } }, "recordType":"DELETE", "postStruct":null }DDL example
ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT "test" COMMENT 'test';{ "prevStruct": null, "postStruct": { "ddl": "ALTER TABLE connector_test.all_mysql_type_test ADD column c90 VARCHAR(30) DEFAULT \"test\" COMMENT 'test'", "__light_type": { "ddl": { "schemaType": "VAR_STRING" } } }, "allMetaData": { "checkpoint": "1671177200", "dbType": "OB_MYSQL", "storeDataSequence": null, "db": "connector_test", "timestamp": "1671177209", "uniqueId": null, "ddlType": "ALTER_TABLE", "record_primary_key": null, "source_identity": null, "record_primary_value": null, "table_name": "all_mysql_type_test" }, "recordType": "DDL" }
Debezium JSON message format
When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), and RocketMQ, Debezium uses the following JSON message format. The format contains two types. Usually, only the payload structure is displayed.
The
schemaandpayloadfields exist.{ "schema": { //The structure that describes the payload fields. This structure is not displayed by default. "type": "struct", //The struct field indicates that the field contains a structure. "optional": false, //Indicates whether the field is required. "fields": [ { "type": "int64", //The type of the field. "optional": false, //Indicates whether the field is required. "field": "ts_ms" //The name of the field. } ] }, "payload": { "op": "c", //The data modification type. Valid values: c (full or insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", //The version of OMS. "connector": "OB_MYSQL", //The data source type. "name": "OMS", //The fixed value OMS. "ts_ms": 0, //The second-level timestamp of data modification. This field exists only in incremental data. "db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is displayed, and the tenant name is not displayed. "table": "testTab", //The name of the table that is modified by using an SQL statement. "pos": "553132@1668496109" //The position in the binlog file. The value is in the [binlog file name]@[binlog file name offset] format. }, "before": { //The before image. "column": "value" //A key-value pair that contains all key-value pairs. }, "after": { //The after image. "column": "value" //A key-value pair that contains all key-value pairs. }, "ts_ms": 1668497367188 //The timestamp of data processing. } }Only the
payloadfield exists.{ "payload": { "op": "c", //The data modification type. Valid values: c (full or insert), u (update), d (delete), and HEARTBEAT (heartbeat message). "source": { "version": "", //The version of OMS. "connector": "OB_MYSQL", //The data source type. "name": "OMS", //The fixed value OMS. "ts_ms": 0, //The second-level timestamp of data modification. This field exists only in incremental data. "db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is displayed, and the tenant name is not displayed. "table": "testTab", //The name of the table that is modified by using an SQL statement. "pos": "553132@16684****" //The position in the binlog file. The value is in the [binlog file name]@[binlog file name offset] format. }, "before": { //The before image. "column": "value" //A key-value pair that contains all key-value pairs. }, "after": { //The after image. "column": "value" //A key-value pair that contains all key-value pairs. }, "ts_ms": 1668497367188 //The timestamp of data processing. } }
Here are some examples:
Example of
INSERT(insert) data{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 } }Example of
UPDATE(update) data{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"6900000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 } }Example of
DELETE(delete) data{ "schema":{ "optional":false, "type":"STRUCT", "fields":[ { "field":"before", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"after", "optional":false, "type":"struct", "fields":[ { "field":"c01", "optional":false, "type":"int32" }, { "field":"c02", "optional":false, "type":"string" }, { "field":"c03", "optional":false, "type":"string" }, { "field":"c04", "optional":false, "type":"bytes" }, { "field":"c05", "optional":false, "type":"int16" }, { "field":"c06", "optional":false, "type":"int16" }, { "field":"c07", "optional":false, "type":"int32" }, { "field":"c08", "optional":false, "type":"int64" }, { "field":"c09", "optional":false, "type":"float64" }, { "field":"c10", "optional":false, "type":"float64" }, { "field":"c11", "optional":false, "type":"string" }, { "field":"c12", "optional":false, "type":"string" }, { "field":"c13", "optional":false, "type":"string" }, { "field":"c14", "optional":false, "type":"string" }, { "field":"c15", "optional":false, "type":"bytes" }, { "field":"c16", "optional":false, "type":"string" }, { "field":"c17", "optional":false, "type":"bytes" }, { "field":"c18", "optional":false, "type":"bytes" }, { "field":"c19", "optional":false, "type":"bytes" }, { "field":"c20", "optional":false, "type":"bytes" }, { "field":"c21", "optional":false, "type":"string" }, { "field":"c22", "optional":false, "type":"int32" }, { "field":"c23", "optional":false, "type":"int64" }, { "field":"c24", "optional":false, "type":"string" }, { "field":"c25", "optional":false, "type":"int32" }, { "field":"c26", "optional":false, "type":"bytes" } ] }, { "field":"source", "optional":false, "type":"struct", "fields":[ { "field":"version", "optional":false, "type":"string" }, { "field":"connector", "optional":false, "type":"string" }, { "field":"name", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" }, { "field":"db", "optional":false, "type":"string" }, { "field":"table", "optional":false, "type":"string" }, { "field":"server_id", "optional":false, "type":"int64" }, { "field":"pos", "optional":false, "type":"string" } ] }, { "field":"op", "optional":false, "type":"string" }, { "field":"ts_ms", "optional":false, "type":"int64" } ] }, "payload":{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 } }
DebeziumFlatten JSON message format
When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, the JSON message format for the DebeziumFlatten serialization method is as follows. Compared with the Debezium serialization method, the schema and payload fields are no longer filled.
{
"op": "c", //The data modification type. Valid values: c (full data or insert), u (update), d (delete), and HEARTBEAT (heartbeat message).
"source": {
"version": "", //The version of OMS.
"connector": "OB_MYSQL", //The type of the data source.
"name": "OMS", //The fixed value OMS.
"ts_ms": 0, //The second-level timestamp of the data change. This field is present only in incremental data.
"db": "test", //The name of the database that is modified by using an SQL statement. If the database is OceanBase Database, only the database name is present, and the tenant name is not required.
"table": "testTab", //The name of the table that is modified by using an SQL statement.
"pos": "553132@16684****" //The position in the binlog file [binlog file name]@[binlog file name offset].
},
"before": { //The image before the change.
"column": "value" //The key-value pair that contains the full key-value.
},
"after": { //The image after the change.
"column": "value" //The key-value pair that contains the full key-value.
},
"ts_ms": 1668497367188 //The timestamp of the data processing.
}
Here are some data examples:
Example of
INSERT(insert) data{ "op":"c", "source":{ "connector":"OB_MYSQL", "pos":"703223@166849****", "name":"OMS", "version":"", "ts_ms":1668491621000, "db":"test", "table":"table_name" }, "after":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":2, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495423594 }Example of
UPDATE(update) data{ "op":"u", "before":{ "c11":"a", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"690000000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"436999@166849****", "name":"OMS", "version":"", "ts_ms":1668495861000, "db":"test", "table":"table_name" }, "after":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "ts_ms":1668495906356 }Example of
DELETE(delete) data{ "op":"d", "before":{ "c11":"aa", "c10":2.4212412, "c13":"c", "c12":"b", "c15":"65", "c14":"d", "c17":"67", "c16":"f", "c19":"69000000000000000000000000000", "c18":"68", "c20":"6A", "c22":19311, "c21":"2022-11-15T05:12:11Z", "c02":"12312", "c24":1668489131000, "c01":1, "c23":36060000000, "c04":"61", "c26":"6B", "c03":"1241.41000", "c25":2022, "c06":141, "c05":11, "c08":412124124, "c07":4241, "c09":2.11111 }, "source":{ "connector":"OB_MYSQL", "pos":"553132@1668****", "name":"OMS", "version":"", "ts_ms":1668496109000, "db":"test", "table":"table_name" }, "ts_ms":1668496119717 }
DebeziumSmt JSON message format
DebeziumSmt is a configuration method provided by Debezium. It uses the Single Message Transform (SMT) to convert and process a single message. When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, the JSON message format for the DebeziumSmt serialization method only displays the key:value in the after field.
For example, when you use the Debezium serialization method to update data:
{
"op": "u",
"source": {
"connector": "OB_MYSQL",
"name": "OMS"
},
"ts_ms": 1668496119717,
"before": {
"field1": "before_value1",
"field2": "before_value2"
},
"after": {
"field1": "after_value1",
"field2": "after_value2"
}
}
After the SMT processes the above message, the message format is simplified. Therefore, the JSON message format for the DebeziumSmt serialization method is as follows:
{
"field1": "after_value1",
"field2": "after_value2"
}
Here are some data examples:
Example of
INSERT(insert) data{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of
UPDATE(update) data{ "field1": "after_value1", "field2": "after_value2", "__deleted": "false" }Example of
DELETE(delete) data{ "field1": "after_value1", "field2": "after_value2", "__deleted": "true" }
Avro JSON message format
When you synchronize data from OceanBase Database in MySQL compatible mode to Kafka, the Avro serialization method uses the following JSON message format.
Full migration
{ "version": 1, "id": 0, "sourceTimestamp": 1702371565, // The safe point timestamp. "sourcePosition": "", // Full migration does not contain position information. "safeSourcePosition": "", "sourceTxid": "", "source": { "sourceType": "MySQL", // Fixed value. "version": "OBMySQL" // Fixed value. }, "operation": "INIT", // Full migration is of the INIT type. "objectName": "test***", "processTimestamps": [ 1702371565238 ], // Only the delivery time. "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" // Only the primary key. }, "fields": [ { "name": "id", "dataTypeNumber": 246 }, // The type of each column. { "name": "bid", "dataTypeNumber": 3 }, { "name": "name", "dataTypeNumber": 15 }, { "name": "address", "dataTypeNumber": 254 } ], "beforeImages": null, // The before image of full migration is empty. "afterImages": [ // The after image. For an INTEGER type, the precision is 8. For a FLOAT type, the precision is 8 and the scale is 64. { "value": "1", "precision": 1, "scale": 0 }, { "precision": 8, "value": "11" }, { "charset": "utf8mb4", "value": { "bytes": "yyy" } }, null ] }Incremental synchronization of DML operations
INSERT(insert) operation{ "version": 1, "id": 170236922143600000, "sourceTimestamp": 1702369092, "sourcePosition": "1702369080", // The checkpoint of OceanBase Database in MySQL compatible mode. "safeSourcePosition": "1702369080", // The checkpoint of OceanBase Database in MySQL compatible mode. "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "INSERT", "objectName": "test***", "processTimestamps": [1702369221480], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": null, // The before image of an INSERT operation is empty. "afterImages": [ {"precision": 8, "value": "2"}, {"precision": 8, "value": "12"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"} } ] }UPDATE(update) operation{ "version": 1, "id": 170236975822100001, "sourceTimestamp": 1702369757, "sourcePosition": "1702369756", "safeSourcePosition": "1702369756", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "UPDATE", "objectName": "test***", "processTimestamps": [1702369758237], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ // The before image and after image of an UPDATE operation exist. {"precision": 8, "value": "3"}, {"precision": 8, "value": "22"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ] }DELETE(delete) operation{ "version": 1, "id": 170236976527500000, "sourceTimestamp": 1702369764, "sourcePosition": "1702369763", "safeSourcePosition": "1702369763", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DELETE", "objectName": "test***", "processTimestamps": [1702369765287], "tags": { "pk_uk_info": "{\"PRIMARY\":[\"id\"]}" }, "fields": [ {"name": "id", "dataTypeNumber": 8}, {"name": "bid", "dataTypeNumber": 3}, {"name": "name", "dataTypeNumber": 15} ], "beforeImages": [ {"precision": 8, "value": "3"}, {"precision": 8, "value": "44"}, {"charset": "utf8mb4", "value": {"bytes": "xxx"}} ], "afterImages": null // The after image of a DELETE operation is empty. }
Incremental synchronization of DDL operations
{ "version": 1, "id": 170236979372400000, "sourceTimestamp": 1702369793, "sourcePosition": "1702369792", "safeSourcePosition": "1702369792", "sourceTxid": "", "source": { "sourceType": "MySQL", "version": "OBMySQL" }, "operation": "DDL", "objectName": "test***", "processTimestamps": [1702369794543], "tags": {}, "fields": null, // Incremental synchronization of DDL operations does not contain fields and beforeImages. "beforeImages": null, "afterImages": "alter table multi_db_multi_tbl add column address char(20) default null" // The afterImages of a STRING type is the DDL statement. }
Format description of data transmission to text protocol
When you synchronize data from OceanBase Database to Kafka, DataHub (BLOB type), or RocketMQ
If the serialization method is Default, Canal, DataWorks (V2.0 supported), SharePlex, or DefaultExtendColumnType, the mapping of OceanBase Database data is as follows.
OceanBase Database in MySQL compatible mode
Data type Mapping type Description TINYINT
SMALLINT
MEDIUMINT
INT
INTEGER
YEAR
BOOL
BOOLEANLong An integer with a bit length of less than 64.
Normal values, such as 1000, are not in scientific notation.
For BOOL/BOOLEAN, true = 1 and false = 0.DECIMAL
NUMERICBigDecimal An exact decimal value or an integer with a bit length of more than 64. For integers, decimal points and decimals are not displayed. For values with decimals, the number of digits is displayed based on the data received from the database. Trailing zeros are not removed, and scientific notation is used. FLOAT
DOUBLEDouble A floating-point number.
The number of significant digits is determined based on whether the source data is of the FLOAT or DOUBLE type. FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETString A string. TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOB
BINARY
VARBINARY
BITBytes A byte array, which is displayed in BASE64 by default.
Note For BIT types, the leading zeros in the byte array are removed after incremental data is received, but they are not removed for full data. Therefore, the BASE64 encoding may vary. However, the actual results are consistent, and the decoding results are consistent.DATE Date A date in the YYYY-MM-DDformat. If the time is invalid, the original string is displayed.TIME Time A time in the HH:mm:ss[.nnnnnnnnn]format.
Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.DATETIME DateTime A date and time, including the time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.TIMESTAMP Timestamp A timestamp in the [second-level timestamp][.nnnnnnnnn]format.
Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, it is displayed in the0000-00-00 00:00:00format.OceanBase Database in Oracle compatible mode
Data type Mapping type Description INTEGER Long An integer with a bit length of less than 64.
Normal values, such as 1000, are not in scientific notation.NUMBER
FLOATBigDecimal An exact decimal value or an integer with a bit length of more than 64. BINARY_FLOAT BINARY_DOUBLE Double A floating-point number.
The number of significant digits is determined based on whether the source data is of the FLOAT or DOUBLE type. FLOAT has 7 significant digits, and DOUBLE has 16 significant digits.VARCHAR2
NVARCHAR2
INTERVAL YEAR TO MOTH
INTERVAL DAY TO SECOND
CLOB
NCLOB
ROWID
UROWIDString A string. BLOB
BFILE
RAWBytes A byte array.
Displayed in BASE64 by default.DATE
TIMESTAMP
TIMESTAMP WITH TIME ZONE
TIMESTAMP WITH LOCAL TIME ZONEDateTime A date and time, including the time zone. The format is YYYY-MM-DD HH:mm:ss[.nnnnnnnnn] [zoneId].
Times with a precision of less than seconds are displayed with a maximum of 9 digits. For times with a precision of less than seconds, all non-zero digits are displayed. If the time is invalid, the original string is displayed.
If the serialization method is Debezium, the mapping of OceanBase Database in MySQL compatible mode is as follows.
Notice
When you synchronize data from OceanBase Database in Oracle compatible mode to Kafka, DataHub (BLOB type), or RocketMQ, you cannot select the Debezium serialization method.
Data type Mapping type Description BOOLEAN
BOOLBOOLEAN The value can be true or false. TINYINT
SMALLINT
MEDIUMINT
INT/INTEGER
BIGINT
YEARLONG An integer in the range of -2^63^ to 2^63^ . BIGINT STRING The data is displayed in the string format. FLOAT
DOUBLEDOUBLE A floating-point number. DECIMAL
NUMERICSTRING The data is displayed in the string format. For values with decimals, the number of digits is displayed based on the data received from the database. Trailing zeros are not removed, and scientific notation is used. BIT
BINARY
VARBINARY
TINYBLOB
BLOB
MEDIUMBLOB
LONGBLOBBYTES A byte array encoded in base16. CHAR
VARCHAR
TINYTEXT
TEXT
MEDIUMTEXT
LONGTEXT
ENUM
SETSTRING A string. TIMESTAMP STRING The format is YYYY-MM-DDTHH:mm:ss[.nnnnnnnnn]Z, and the time zone is UTC. DATE LONG The number of days elapsed since January 1, 1970. TIME LONG The number of microseconds elapsed since 00:00:00, without time zone information. DATETIME LONG The number of milliseconds elapsed since January 1, 1970, 00:00:00, without time zone information. If the serialization method is Avro, the mapping of OceanBase Database in MySQL compatible mode is as follows.
Notice
You can select the Avro serialization method only when you synchronize data from OceanBase Database in MySQL compatible mode to Kafka.
Type name Mapping type TINYINT
BOOLEAN
SMALLINT
MEDIUMINT
INT
BIGINT
BITINTEGER FLOAT
DOUBLEFLOAT DECIMAL
NUMERICDECIMAL VARCHAR
CHAR
TINYTEXT
MEDIUMTEXT
LONGTEXT
TEXTCHARACTER BINARY
VARBINARY
TINYBLOB
MEDIUMBLOB
LONGBLOB
BLOBBinaryObject TIMESTAMP TimestampObject
Note For TIMESTAMP data, both full and incremental data are converted to timestamps. Invalid timestamps are converted to-9223372022400L.
Except for invalid timestamps, you can use the JavaInstant.ofEpochSecond(ts, nanos)method to obtain the correct wall time.DATE
TIME
DATETIME
YEARDATETIME JSON
ENUM
SETTextObject GEOMETRY TextGeometry
Note The data transmission uses the EWKT format, so the data is mapped to the TextGeometry type.