This topic describes the message formats of Avro and Canal JSON supported by data subscription.
Message formats for subscriptions
Avro JSON message format
{
"namespace": "com.xxx.oms.xxx.xxx.avro",
"type": "record",
"name": "AvroRecord",
"fields": [
{
"name": "id",
"type": "long",
"doc": "unique id of this record in the whole stream"
},
{
"name": "version",
"default": 1,
"type": [
"int"
],
"doc": "protocol version"
},
{
"name": "operation",
"default": null,
"type": [
"null",
{
"namespace": "com.xxx.oms.xxx.xxx.avro",
"name": "Operation",
"type": "enum",
"symbols": [
"INSERT",
"UPDATE",
"DELETE",
"DDL",
"BEGIN",
"COMMIT",
"HEARTBEAT"
]
}
]
},
{
"name": "xid",
"default": null,
"type": [
"null",
"string"
],
"doc": "transaction id"
},
{
"name": "txind",
"default": null,
"type": [
"null",
{
"namespace": "com.xxx.oms.xxx.xxx.avro",
"name": "TxindType",
"type": "enum",
"symbols": [
"B",
"M",
"E",
"W"
]
}
],
"doc": "transaction record flag"
},
{
"name": "position",
"default": null,
"type": [
"null",
"string"
],
"doc": "current record position"
},
{
"name": "timestamp",
"default": null,
"type": [
"null",
"long"
],
"doc": "record log timestamp"
},
{
"name": "source",
"default": null,
"type": [
"null",
{
"type": "record",
"name": "Source",
"fields": [
{
"name": "sourceType",
"type": {
"type": "enum",
"name": "SourceType",
"symbols": [
"OB_MYSQL",
"OB_ORACLE"
]
}
},
{
"name": "version",
"type": {
"type": "string"
},
"doc": "source datasource version information"
}
]
}
],
"doc": "source datasource"
},
{
"name": "schemaName",
"default": null,
"type": [
"null",
"string"
],
"doc": "schema name"
},
{
"name": "tableName",
"default": null,
"type": [
"null",
"string"
],
"doc": "table name"
},
{
"name": "fields",
"default": null,
"type": [
"null",
{
"type": "array",
"items": {
"namespace": "com.xxx.oms.xxx.xxx.avro",
"name": "Field",
"type": "record",
"fields": [
{
"name": "name",
"type": "string",
"doc": "column name for table"
},
{
"name": "dataTypeNumber",
"type": "int",
"doc": "data type number of this column in source table"
}
],
"doc": "definition of column"
}
}
]
},
{
"name": "pkIndexes",
"default": null,
"type": [
"null",
{
"type": "array",
"items": "int"
}
],
"doc": "primary keys"
},
{
"name": "ukIndexes",
"default": null,
"type": [
"null",
{
"type": "array",
"items": {
"type": "array",
"items": "int"
}
}
],
"doc": "primary keys"
},
{
"name": "beforeImages",
"default": null,
"doc": "values of a row before modify",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "ColumnValue",
"namespace": "com.xxx.oms.xxx.xxx.avro",
"fields": [
{
"name": "type_info",
"type": {
"type": "enum",
"name": "DataType",
"symbols": [
"NULL",
"INTEGER",
"LONG",
"FLOAT",
"DOUBLE",
"DECIMAL",
"STRING",
"BINARY",
"TIMESTAMP",
"DATE",
"TIME",
"DATETIME",
"BOOLEAN",
"BIT",
"GEOMETRY",
"RAW",
"ENUM",
"SET",
"ARRAY",
"VECTOR",
"SPARSE_VECTOR",
"ROARINGBITMAP",
"MAP"
]
}
},
{
"name": "value",
"type": [
"null",
"boolean",
"int",
"long",
"float",
"double",
"bytes",
"string",
{
"type": "record",
"name": "StringObject",
"fields": [
{
"name": "charsetName",
"type": "string"
},
{
"name": "value",
"type": "string"
}
]
},
{
"type": "record",
"name": "DecimalObject",
"fields": [
{
"name": "precision",
"type": "int"
},
{
"name": "scale",
"type": "int"
},
{
"name": "value",
"type": "string"
}
]
},
{
"type": "record",
"name": "DateObject",
"fields": [
{
"name": "year",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "day",
"type": "int"
}
]
},
{
"type": "record",
"name": "TimeObject",
"fields": [
{
"name": "negative",
"type": "boolean",
"default": false
},
{
"name": "hours",
"type": "int"
},
{
"name": "minutes",
"type": "int"
},
{
"name": "seconds",
"type": "int"
},
{
"name": "nanos",
"type": "int"
}
]
},
{
"type": "record",
"name": "DateTimeObject",
"fields": [
{
"name": "year",
"type": "int"
},
{
"name": "month",
"type": "int"
},
{
"name": "day",
"type": "int"
},
{
"name": "hours",
"type": "int"
},
{
"name": "minutes",
"type": "int"
},
{
"name": "seconds",
"type": "int"
},
{
"name": "nanos",
"type": "int",
"default": null
}
]
},
{
"type": "record",
"name": "TimestampObject",
"fields": [
{
"name": "seconds",
"type": "long"
},
{
"name": "nanos",
"type": "int"
},
{
"name": "timezone",
"type": [
"null",
"string"
],
"default": null
}
]
},
{
"type": "record",
"name": "BitObject",
"fields": [
{
"name": "bit_length",
"type": "int"
},
{
"name": "value",
"type": "string"
}
]
},
{
"type": "record",
"name": "EnumSetValue",
"fields": [
{
"name": "value",
"type": "string"
},
{
"name": "defines",
"type": [
"null",
{
"type": "array",
"items": "string"
}
]
}
]
},
{
"type": "record",
"name": "GeometryValue",
"fields": [
{
"name": "srid",
"type": "int"
},
{
"name": "wkb",
"type": "bytes"
}
]
}
]
}
]
}
}
]
},
{
"name": "afterImages",
"default": null,
"doc": "values of a row after modify",
"type": [
"null",
{
"type": "array",
"items": "com.xxx.oms.xxx.xxx.avro.ColumnValue"
}
]
},
{
"name": "sql",
"default": null,
"type": [
"null",
"string"
],
"doc": "DDL sql if this is a ddl event"
},
{
"name": "tags",
"default": null,
"type": [
"null",
{
"type": "map",
"values": "string"
}
],
"doc": "Additional information about the source"
},
{
"name": "total",
"type": "int",
"default": -1,
"doc": "Not implemented at current version"
},
{
"name": "index",
"default": -1,
"type": "int",
"doc": "Not implemented at current version"
},
{
"name": "beforeImageBytes",
"default": "",
"type": "bytes",
"doc": "bytes value of beforeImage when record is too large to split"
},
{
"name": "afterImageBytes",
"default": "",
"type": "bytes",
"doc": "bytes value of afterImage when record is too large to split"
}
]
}
| Parameter | Description |
|---|---|
| id | A globally unique incrementing ID. |
| version | The protocol version. The current value is 1. |
| operation | The type of the message. Valid values: "INSERT", "UPDATE", "DELETE", "DDL", "BEGIN", "COMMIT", and "HEARTBEAT". |
| xid | The transaction ID. |
| txind | An identifier for the current DML record in the transaction.
|
| position | The offset of the current record in the incremental log. |
| timestamp | The time when the incremental record was written, a Unix timestamp, in seconds. |
| source | The database type and version of the source database. Currently, only databases of type OB_MYSQL or OB_ORACLE are supported. |
| schemaName | The name of the database or schema. |
| tableName | The server ID of the source database. To view the server ID of the source database, execute SHOW VARIABLES LIKE 'server_id'. |
| fields | The definitions of the columns in the table, including column names and column types. |
| pkIndexes | If the table has a primary key, this parameter is included in the DML message. Otherwise, it is not included. |
| ukIndexes | If the table has a unique key, this parameter is included in the DML message. Otherwise, it is not included. |
| beforeImages | The data of the row before the DML operation. For an INSERT message, this array is null. |
| afterImages | The data of the row after the DML operation. For a DELETE message, this array is null. |
| sql | The SQL statement of the DDL operation. |
| tags | Some additional information. this parameter is empty. |
| total | If the message is sharded, this parameter is the total number of shards. |
| index | If the message is sharded, this parameter is the index of the current shard. |
| beforeImageBytes | If the message is sharded, this parameter is the data of the beforeImages part in the shard. |
| afterImageBytes | If the message is sharded, this parameter is the data of the afterImages part in the shard. |
Canal JSON message format
{
"data": [
{
"id": "111",
"name": "name",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}
| Parameter | Description |
|---|---|
| data | A list of new records after the data change. Usually an array, each object represents a record with field names and values. |
| database | The name of the database where the change occurred. |
| es | Event timestamp in milliseconds, indicating the time when the incremental message was executed. |
| id | The ID of the parsed message, which is an incrementing number. |
| isDdl | Indicates whether it's a DDL operation. Optional values are true for DDL operations like schema changes, and false for data changes. |
| mysqlType | The MySQL types of fields in the table. Keys are field names, values are types. |
| old | The old values of fields that changed (only includes changed fields and their old values). |
| pkNames | List of primary key field names in the table. |
| sql | If it's a DDL operation, this parameter contains the corresponding SQL statement. If it's a DML operation, this parameter is empty. |
| sqlType | The Java SQL types of fields in the table. Keys are field names, values are types. |
| table | The name of the table. |
| ts | Server timestamp in milliseconds when the message was parsed. |
| type | The type of change, including INSERT, UPDATE, DELETE, or DDL types. |
Data mapping
MySQL compatible mode of OceanBase Database
The following table describes the mapping between data types supported by the MySQL compatible mode of OceanBase Database and those defined in the Avro protocol.
Category Data type in MySQL compatible mode of OceanBase Database Data type in Avro Integer BOOL/BOOLEAN/TINYINT INTEGER SMALLINT INTEGER MEDIUMINT INTEGER INT/INTEGER INTEGER/LONG BIGINT LONG Fixed-Point DECIMAL DECIMAL NUMERIC DECIMAL Floating-point FLOAT FLOAT DOUBLE DOUBLE Bit-Value BIT BIT Datetime DATETIME DATETIME TIMESTAMP TIMESTAMP DATE DATE TIME TIME YEAR INTEGER Character CHAR STRING VARCHAR STRING BINARY BINARY VARBINARY BINARY Large object TINYBLOB BINARY BLOB BINARY MEDIUMBLOB BINARY LONGBLOB BINARY Text TINYTEXT STRING TEXT STRING MEDIUMTEXT STRING LONGTEXT STRING STRING STRING Enum ENUM ENUM Set SET SET JSON JSON STRING Spatial GEOMETRY GEOMETRY POINT GEOMETRY LINESTRING GEOMETRY POLYGON GEOMETRY MULTIPOINT GEOMETRY MULTILINESTRING GEOMETRY MULTIPOLYGON GEOMETRY GEOMETRYCOLLECTION GEOMETRY Bitmap ROARINGBITMAP ROARINGBITMAP Array ARRAY ARRAY Mapping MAP MAP Vector VECTOR VECTOR SPARSEVECTOR SPARSEVECTOR Oracle compatible mode of OceanBase Database
The following table describes the mapping between data types supported by the Oracle compatible mode of OceanBase Database and those defined in the Avro protocol.
Category Data type in Oracle compatible mode of OceanBase Database Data type in Avro Character CHAR STRING NCHAR STRING VARCHAR2 STRING VARCHAR STRING NVARCHAR2 STRING Numeric NUMBER DECIMAL FLOAT FLOAT BINARY_FLOAT FLOAT BINARY_DOUBLE DOUBLE Datetime DATE DATE TIMESTAMP TIMESTAMP TIMESTAMP WITH TIME ZONE TIMESTAMP TIMESTAMP WITH LOCAL TIME ZONE TIMESTAMP Interval INTERVAL YEAR TO MONTH STRING INTERVAL DAY TO SECOND STRING ROW RAW RAW Rowid ROWID BINARY UROWID BINARY Large object BLOB BINARY CLOB STRING JSON JSON STRING XML XML STRING Spatial SDO_GEOMETRY GEOMETRY