Message Queue for Apache RocketMQ is a distributed message-oriented middleware built by Alibaba Cloud based on Apache RocketMQ. It features a low latency, high concurrency, and high reliability. The data synchronization feature of OceanBase Migration Service (OMS) Community Edition allows you to synchronize data between an OceanBase physical table and a RocketMQ data source in real time, extending the message processing capability.
Prerequisites
A topic with the partitionally ordered message property is created in your RocketMQ instance.
Limits
Only physical tables can be synchronized.
When data transfer is resumed on a link, some data within the last minute may be duplicate in the RocketMQ instance, and deduplication is required in downstream systems.
The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
Procedure
Create a data synchronization project.
Log on to the OMS Community Edition console.
In the left-side navigation pane, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Project in the upper-right corner.
On the Select Source and Destination page, specify related parameters.
Parameter Description Project Name It must not exceed 64 characters in length and can contain Chinese characters, digits, and letters. Tag Click the field and select a tag from the drop-down list. You can also click Manage Tags to create, modify, or delete tags. For more information, see Use tags to manage data synchronization projects. Source If you have created an OceanBase data source, select it from the drop-down list. If you have not created a data source, click Add Data Source in the drop-down list and create a data source in the dialog box that appears on the right. For more information, see Add an OceanBase Community Edition data source. Destination If you have created a RocketMQ data source, select it from the drop-down list. If you have not created a data source, click Add Data Source in the drop-down list and create a data source in the dialog box that appears on the right. For more information, see Add a RocketMQ data source. Click Next. On the Select Synchronization Type page, specify Synchronization Type and Configuration for the current data synchronization project.
In the Synchronization Type and Configuration section, Incremental Synchronization supports only DML for Data Change, which includes
Insert,Delete, andUpdate.(Optional) Click Next.
If the source database is an OceanBase database, you must configure the obconfig_url parameter, username, and password for incremental synchronization.
If you have selected Incremental Synchronization without configuring the required parameters for the source database, the More About Data Sources dialog box appears to prompt you to configure the parameters. For more information, see Add an OceanBase Community Edition data source.
After you configure the parameters, click Test Connectivity. After the test succeeds, click Save.
Click Next. On the Select Synchronization Objects page, select a synchronization scope.
When you synchronize data from an OceanBase database to a RocketMQ instance, you can synchronize data from multiple tables to multiple topics.
In the left-side pane, select the objects to be synchronized.
Click > .
In the Map Object to Topic dialog box, search for and select the topics to be synchronized from the Existing Topics drop-down list.
You can also enter the name of an existing topic and select it after it appears.
Click OK .
When you synchronize data from an OceanBase database to a RocketMQ instance, OMS allows you to import objects from text and perform the following operations on the objects in the destination database: change topics, set row filtering conditions, and remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.
Operation Steps Import Objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK. Notice This operation will overwrite previous selections. Proceed with caution.
- In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filtering conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation succeeds, click OK.
Change Topic - In the list on the right, hover the pointer over the target object.
- Click Change Topic .
- In the Map Object to Topic dialog box, change the topics to be synchronized.
- Click OK . Notice The selected topics and tables will be merged to the selected topic. Proceed with caution.
Settings You can use the WHEREclause to filter data by row, select sharding columns, and select columns to be synchronized.- In the list on the right, hover the pointer over the target object.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. Only the data meeting theWHEREcondition is synchronized to the destination data source, thereby filtering data by row. If the statement contains a reserved SQL keyword, add an escape character (`). - Select the target sharding columns from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional. Unless otherwise specified, select the primary keys as sharding columns. If the primary keys are not load-balanced, select fields with unique identifiers and whose load is balanced as sharding columns. Ensure that the selected sharding columns are correct. An incorrect sharding column will cause data synchronization to fail. Sharding columns can be used for the following purposes:
- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the target table supports concurrent writes.
- Orderliness: OMS Community Edition ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. If you select all or no columns, OMS Community Edition will synchronize all columns.
- In the Row Filters section, specify a standard SQL
- Click OK.
Remove/Remove All OMS Community Edition allows you to remove a single synchronization object or all synchronization objects. - Remove a single synchronization object In the list on the right, hover the pointer over the target object, and click Remove to remove the synchronization object.
- Remove all synchronization objects In the list on the right, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next . On the Synchronization Options page, specify the following parameters.
Category Parameter Description Synchronization Settings Incremental Synchronization Start Timestamp The data after the synchronization start timestamp is synchronized. The default value is the current system time. You can select a point in time or enter a timestamp.
Notice:
You can select the current time or a point in time earlier than the current time. This parameter is closely related to the retention period of archived logs. Generally, you can start data synchronization from the current timestamp.Advanced Options Serialization Method The message format for synchronizing data to a RocketMQ instance. Valid values: Default , Canal , Dataworks (version 2.0 supported), SharePlex, and DefaultExtendColumnType. For more information, see Data formats. Advanced Options Enable Intra-Transaction Sequence Specifies whether to maintain ordering within a transaction. If this feature is enabled, OMS Community Edition marks the sequence number for a transaction to be sent to a downstream node.
Notice:
This parameter is valid only for the SharePlex format and is intended for you to obtain the sequence numbers of the DML statements that form a transaction.
For example, if a transaction contains 10 DML statements numbered from 1 to 10, OMS Community Edition will deliver these statements to the destination database in the same order.
If this option is enabled, the system performance may be affected. Choose whether to enable it based on the business characteristics.Advanced Options Partitioning Rules When you synchronize data to a RocketMQ instance, only Hash is supported. Hash indicates that OMS Community Edition uses a hash algorithm to select the message queue (MQ) for RocketMQ based on the value of the primary key or sharding column. Advanced Options Business System Identification (Optional) Identifies the source business system of data. The business system identifier consists of 1 to 20 characters. Destination Enter the name of the producer group A producer group identifies a group of producers, who can write data to multiple topics. Destination Whether to Allow Message Tracing If message tracing is allowed, OMS can trace the time and status of all nodes in the process in which a message is sent by the producer to the RocketMQ server and then consumed by a consumer. Message tracing provides data for troubleshooting in production environments. Click Precheck .
During Precheck, OMS Community Edition detects the connection with the destination data source. If the project fails the precheck, identify the cause, fix the problem, and run the precheck again until the project passes the precheck.
Click Start Project.
If you do not need to start the project now, click Save to go to the details page of the data synchronization project. You can start the project later as needed. For more information, see View details of a data synchronization project.