Kafka is a widely used high-performance distributed stream computing platform. OceanBase Migration Service (OMS) supports real-time data synchronization between a self-managed Kafka instance and MySQL-compatible and Oracle-compatible tenants of OceanBase Database, extending the message processing capability. Therefore, OMS is widely applied to business scenarios such as real-time data warehouse building, data queries, and report distribution. This topic describes how to synchronize data from OceanBase Database to a Kafka instance.
OMS allows you to synchronize data to message queue products, extending the all-around application of your business in big data fields, such as monitoring data aggregation, streaming data processing, and online/offline analysis. For more information about the data formats for the two types of tenants, see Data formats.
Prerequisites
You have created a dedicated database user for data synchronization in the source OceanBase database and granted corresponding privileges to the user. For more information, see Create a database user.
Limitations
Only physical tables can be synchronized.
OMS supports Kafka 0.9, 1.0, 2.x, and 3.x.
Notice
When the version of the Kafka instance is 0.9, schema synchronization is not supported.
During data synchronization, if you rename a source table to be synchronized and the new name is beyond the synchronization scope, the data of the source table is not synchronized to the target Kafka instance.
The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
Data source identifiers and user accounts must be globally unique in OMS.
OMS supports the synchronization of only objects whose database name, table name, and column name are ASCII-encoded and do not contain special characters. The special characters are line breaks, spaces, and the following characters:
. | " ' ` ( ) = ; / & \.OMS does not support a standby OceanBase database as the source.
Considerations
In a data synchronization task where the source is OceanBase Database and DDL synchronization is enabled, if a
RENAMEoperation is performed on a table in the source database, we recommend that you restart the task to avoid data loss during incremental synchronization.During incremental synchronization from OceanBase Database V4.x, if the STORED attribute is not marked for a generated column, the column value synchronized to the target database is NULL. As a result, the data of this column received by the downstream system is not as expected.
Take note of the following items when an updated row contains a LOB column:
If the LOB column is updated, do not use the value stored in the LOB column before the
UPDATEorDELETEoperation.The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.
If the LOB column is not updated, the value stored in the LOB column before and after the
UPDATEorDELETEoperation is NULL.
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
When data transmission is resumed for a task, some data (transmitted within the last minute) may be duplicated in the Kafka instance. Therefore, deduplication is required in downstream systems.
If you select only Incremental Synchronization when you create a data synchronization task, OMS requires that the local incremental logs in the source database be retained for more than 48 hours.
If you select Full Synchronization and Incremental Synchronization when you create the data synchronization task, OMS requires that the local incremental logs of the source database be retained for at least seven days. Otherwise, the data synchronization task may fail or the data in the source and target databases may be inconsistent because OMS cannot obtain incremental logs.
When you synchronize data from OceanBase Database to a Kafka instance, if the statement for unique index creation fails the execution in the source database, the Kafka instance consumes the DDL statements for unique index creation and deletion. If the downstream DDL statement for unique index creation fails the execution, ignore this exception.
Notice
When you synchronize data from OceanBase Database V2.2.73 or later but earlier than V3.x, we recommend that you use OceanBase Change Data Capture (obcdc) V2.2.73 or later but earlier than V3.x, or obcdc of a version later than V3.2.4.5, to ensure the orderliness of DML row change operations within a transaction.
When you synchronize data from OceanBase Database of a version earlier than V2.2.73, the orderliness of DML row change operations within a transaction cannot be ensured.
Supported DDL operations for synchronization
CREATE TABLENotice
The created table must be a synchronization object. In addition, you need to execute the
DROP TABLEstatement on a synchronized table, and then execute theCREATE TABLEstatement on this table.ALTER TABLEDROP TABLETRUNCATE TABLEIn delayed deletion, the same transaction contains two identical
TRUNCATE TABLEstatements. In this case, idempotence is implemented for downstream consumption.ALTER TABLE…TRUNCATE PARTITIONCREATE INDEXDROP INDEXCOMMENT ON TABLENotice
This DDL is not supported when you synchronize data from the Oracle compatible mode of OceanBase Database to Kafka.
RENAME TABLENotice
The renamed table must be a synchronization object.
Procedure
Create a data synchronization task.

Log in to the OMS console.
In the left-side navigation pane, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Task in the upper-right corner.
On the Select Source and Target page, configure the parameters.
Parameter Description Task Name We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length. Source If you have created an OceanBase data source, which can be a physical data source or an ApsaraDB for OceanBase data source, select it from the drop-down list. Otherwise, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information about the parameters, see Create a physical OceanBase data source or Create an ApsaraDB for OceanBase data source. Target If you have created a Kafka data source, select it from the drop-down list. Otherwise, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information, see Create a Kafka data source. Tag Click the field and select a tag from the drop-down list. This parameter is optional. You can also click Manage Tags to create, modify, and delete tags. For more information, see Use tags to manage data synchronization tasks. Click Next. On the Select Synchronization Type page, specify the synchronization types for the current data synchronization task.

The supported synchronization types are Schema Synchronization, Full Synchronization, and Incremental Synchronization. Full Synchronization supports the synchronization of tables without primary keys. Incremental Synchronization supports the DML Synchronization and DDL Synchronization options. The supported DML operations are
INSERT,DELETE, andUPDATE. Select the options based on your business needs. For more information, see Configure DDL/DML synchronization.(Optional) Click Next.
If you have selected Incremental Synchronization without configuring the required parameters for the source OceanBase database, the More About Data Sources dialog box appears to prompt you to configure the parameters. For more information about the parameters, see Create a physical OceanBase data source or Create an ApsaraDB for OceanBase data source.
After you configure the parameters, click Test Connection. After the test succeeds, click OK.
Click Next. On the Select Synchronization Objects page, select the objects to be synchronized in the current data synchronization task.
You can select Specify Objects or Match Rules to specify the synchronization objects. The following procedure describes how to specify synchronization objects by using the Specify Objects option. For information about the procedure for specifying synchronization objects by using the Match Rules option, see the Configure matching rules for data migration/synchronization from a database to a Message Queue instance section in the Configure matching rules topic.
Note
If you have selected DDL Synchronization in the Select Synchronization Type step, we recommend that you select Match Rules to specify synchronization objects. This way, all new objects that meet the specified rules will be synchronized. If you select Specify Objects to specify synchronization objects, new or renamed objects will not be synchronized.
When you synchronize data from OceanBase Database to a Kafka instance, you can synchronize data from multiple tables to multiple topics.
In the Select Synchronization Objects section, select Specify Objects.
In the left-side pane, select the objects to be synchronized.
Click >.

In the Map Object to Topic dialog box, select a mapping method.

If you did not select Schema Synchronization as the synchronization type, you can select only Existing Topics here. If you have selected Schema Synchronization when you specify the synchronization type, you can select only one mapping method to create or select a topic.
For example, if you have selected Schema Synchronization, when you use both the Create Topic and Select Topic mapping methods or rename the topic, a precheck error will be returned due to option conflicts.
Parameter Description Create Topic Enter the name of the new topic in the text box. The topic name contains 3 to 64 characters, and can contain only letters, digits, hyphens (-), underscores (_), and periods (.). Select Topic OMS allows you to query Kafka topics. You can click Select Topic, and then find and select a topic for synchronization from the Existing Topics drop-down list. You can also enter the name of an existing topic and select it after it appears. Batch Generate Topics The format for generating topics in batches is Topic_${Database Name}_${Table Name}.If you select Create Topic or Batch Generate Topics, after the schema migration succeeds, you can query the created topics in the Kafka instance. By default, the number of partitions is 3 and the number of partition replicas is 1. These parameters cannot be modified. If the topics do not meet your business needs, you can create topics in the target database as needed.
Click OK.
Note
OMS automatically filters out unsupported tables. For information about the SQL statements for querying table objects, see SQL statements for querying table objects.
OMS allows you to import objects from text files, change the topics of the objects, set row-based filtering, or remove a single object or all objects. Objects in the target database are listed in the structure of Topic > Database > Table.
Note
When you select Match Rules to specify synchronization objects, object renaming is implemented based on the syntax of the specified matching rules. In the operation area, you can only set filter conditions and select sharding columns and the columns to be synchronized. For more information, see Configure matching rules.

Operation Description Import objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK.
Notice
This operation will overwrite previous selections. Proceed with caution. - In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filter conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation succeeds, click OK.
Change topics OMS allows you to change the topic for objects in the target database. For more information, see Change the topic. Configure settings OMS allows you to configure row-based filtering, select sharding columns, and select columns to be synchronized. - In the list on the right, move the pointer over the object that you want to set.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary key as sharding columns. If the primary keys are not load-balanced, select load-balanced fields with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the target table supports concurrent writes.
- Orderliness: OMS ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
- In the Row Filters section, specify a standard SQL
- Click OK.
Remove one or all objects OMS allows you to remove a single object or all objects to be synchronized to the target database during data mapping. - Remove a single synchronization object
In the list on the right, move the pointer over the object that you want to remove, and click Remove to remove the synchronization object. - Remove all synchronization objects
In the list on the right, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, configure the following parameters.
Full synchronization
The following parameters are displayed only if you have selected Full Synchronization on the Select Synchronization Type page.

Parameter Description Full Synchronization Rate Limit You can choose whether to limit the full synchronization rate as needed. If you choose to limit the full synchronization rate, you must specify the records per second (RPS) and bytes per second (BPS). The RPS specifies the maximum number of data rows synchronized to the target database per second during full synchronization, and the BPS specifies the maximum amount of data in bytes synchronized to the target database per second during full synchronization. Note
The RPS and BPS values specified here are only for throttling. The actual full synchronization performance is subject to factors such as the settings of the source and target databases and the instance specifications.
Full Synchronization Resource Configuration You can select Small, Medium, or Large to use the corresponding default values of Read Concurrency, Write Concurrency, and Memory. You can also customize the resource configurations for full synchronization. By setting the resource configuration for the Full-Import component, you can limit the resource consumption of a task in the full synchronization phase. Notice
In the case of custom configurations, the minimum value is
1, and only integers are supported.Incremental synchronization
The following parameters are displayed only if you have selected Incremental Synchronization on the Select Synchronization Type page.

Parameter Description Incremental Synchronization Rate Limit You can choose whether to limit the incremental synchronization rate as needed. If you choose to limit the incremental synchronization rate, you must specify the RPS and BPS. The RPS specifies the maximum number of data rows synchronized to the target database per second during incremental synchronization, and the BPS specifies the maximum amount of data in bytes synchronized to the target database per second during incremental synchronization. Note
The RPS and BPS values specified here are only for throttling. The actual incremental synchronization performance is subject to factors such as the settings of the source and target databases and the instance specifications.
Incremental Log Pull Resource Configuration You can select Small, Medium, or Large to use the corresponding default value of Memory. You can also customize the resource configurations for incremental log pull. By setting the resource configuration for the Store component, you can limit the resource consumption of a task in log pull in the incremental synchronization phase. Notice
In the case of custom configurations, the minimum value is
1, and only integers are supported.Incremental Data Write Resource Configuration You can select Small, Medium, or Large to use the corresponding default values of Write Concurrency and Memory. You can also customize the resource configurations for incremental data write. By setting the resource configuration for the Incr-Sync component, you can limit the resource consumption of a task in data writes in the incremental synchronization phase. Notice
In the case of custom configurations, the minimum value is
1, and only integers are supported.Incremental Record Retention Time The duration that incremental parsed files are cached in OMS. A longer retention period results in more disk space occupied by the Store component. Incremental Synchronization Start Timestamp - If you have selected Full Synchronization as the synchronization type, the default value of this parameter is the task startup time and cannot be modified.
- If you did not select Full Synchronization as the synchronization type, set this parameter to a certain point of time, which is the current system time by default. For more information, see Set an incremental synchronization timestamp.
Advanced options

Parameter Description Serialization Method The message format for synchronizing data to a Kafka instance. Valid values: Default, Canal, Dataworks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro. For more information, see Data formats.
Notice- At present, only MySQL-compatible tenants of OceanBase Database support Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
- If the message format is set to Dataworks, DDL operations
COMMENT ON TABLEandALTER TABLE…TRUNCATE PARTITIONcannot be synchronized.
Partitioning Rules The rule for synchronizing data from OceanBase Database to a Kafka topic. Valid values: Hash, Table, and One. For more information about the delivery of DDL statements in different scenarios and examples, see the description below. - Hash indicates that OMS uses a hash algorithm to select the partition of a Kafka topic based on the value of the primary key or sharding column.
Notice
The Hash rule supports only delivering data to all partitions. - Table indicates that OMS delivers all data in a table to the same partition and uses the table name as the hash key.
- One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.
Business System Identification (Optional) Identifies the source business system of data. This parameter is displayed only when Serialization Method is set to Dataworks. The business system identifier consists of 1 to 20 characters. The following table describes the delivery of a DDL statement in different scenarios.
Partitioning rule When the DDL statement involves multiple tables (example: RENAME TABLE)When the DDL statement involves unknown tables (example: DROP INDEX)When the DDL statement involves a single table Hash The DDL statement is delivered to all partitions of the topics associated with the involved tables.
Assume that the DDL statement involves three tables, A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to all partitions of Topic 1 and Topic 2.The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by OMS. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.The DDL statement is delivered to all partitions of the topic associated with the table. Table The DDL statement is delivered to specific partitions of the topics associated with the tables. The partitions correspond to the hash values of the names of involved tables.
Assume that the DDL statement involves three tables, A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to the partitions corresponding to the hash values of the involved table names in Topic 1 and Topic 2.The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by OMS. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.The DDL statement is delivered to a partition of the topic associated with the table. One The DDL statement is delivered to a fixed partition of the topics associated with the tables.
Assume that the DDL statement involves three tables, A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to a fixed partition of Topic 1 and Topic 2.The DDL statement is delivered to a fixed partition of all topics of the current task.
Assume that the DDL statement cannot be identified by OMS. If the current task has three topics, the DDL statement is delivered to a fixed partition of these three topics.The DDL statement is delivered to a fixed partition of the topic associated with the table.
If the parameter settings on the page cannot meet your requirements, you can click Parameter Configuration in the lower part of the page to configure more specific settings. You can also reference an existing task or component template.

Click Precheck.
During the precheck, OMS detects the connection with the target Kafka instance. If an error is returned during the precheck, you can perform the following operations:
Identify and troubleshoot the problem and then perform the precheck again.
Click Skip in the Actions column of a failed precheck item. In the dialog box that prompts the consequences of the operation, click OK.
Click Start Task. If you do not need to start the task now, click Save to go to the details page of the task. You can start the task later as needed.
OMS allows you to modify the synchronization objects when the data synchronization task is running. For more information, see View and modify synchronization objects. After the data synchronization task is started, it will be executed based on the selected synchronization types. For more information, see the View synchronization details section in the View details of a data synchronization task topic.
If the data synchronization task encounters an execution exception due to a network failure or slow startup of processes, you can click Recover on the Synchronization Tasks or Details page of the synchronization task.