Message Queue for Apache RocketMQ is a distributed message-oriented middleware built by Alibaba Cloud based on Apache RocketMQ. It features low latency, high concurrency, and high reliability. The data synchronization feature of OceanBase Migration Service (OMS) allows you to synchronize data between an OceanBase physical table and a RocketMQ data source in real time, extending the message processing capability.
For more information about the data formats for the two types of tenants, see Data formats.
Prerequisites
You have created a dedicated database user for data synchronization in the source OceanBase database and granted corresponding privileges to the user. For more information, see Create a database user.
Limitations
OMS supports RocketMQ 4.x and 5.x instances, including the enterprise and community editions.
Only physical tables can be synchronized.
During data synchronization, OMS allows you to drop a table before creating a new one. In other words, you can execute
DROP TABLEand then executeCREATE TABLE. In OMS, you cannot create a new table by renaming a table. That is, you cannot executeRENAME TABLE a TO a_tmp.The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
Data source identifiers and user accounts must be globally unique in OMS.
OMS supports the migration of only objects whose database name, table name, and column name are ASCII-encoded and do not contain special characters. The special characters are spaces line breaks, and the following characters:
. | " ' ` ( ) = ; / & \.OMS does not support a standby OceanBase database as the source.
Considerations
During incremental synchronization from OceanBase Database V4.x, if the STORED attribute is not marked for a generated column, the column value synchronized to the destination is NULL. As a result, the data of this column received by the downstream system is not as expected.
Take note of the following items when an updated row contains a LOB column:
If the LOB column is updated, do not use the value stored in the LOB column before the
UPDATEorDELETEoperation.The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.
If the LOB column is not updated, the value stored in the LOB column before and after the
UPDATEorDELETEoperation is NULL.
When a data synchronization project is resumed, some data (within the last minute) may be duplicate in the RocketMQ instance, and deduplication is required in downstream systems.
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
Procedure
Create a data synchronization project.
Log on to the OMS console.
In the left-side navigation pane, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Project in the upper-right corner.
On the Select Source and Destination page, configure the parameters.
Parameter Description Synchronization Project Name We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length. Tag (Optional) Click the field and select a target tag from the drop-down list. You can also click Manage Tags to create, modify, and delete tags. For more information, see Manage data synchronization projects by using tags. Source If you have created an OceanBase data source, which can be a physical data source or an ApsaraDB for OceanBase data source, select it from the drop-down list. If not, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information about the parameters, see Create a physical OceanBase data source or Create a public cloud OceanBase data source. Destination If you have created a RocketMQ data source, select it from the drop-down list. If not, click New Data Source in the drop-down list to create one in the dialog box on the right side. For more information, see Create a RocketMQ data source. Click Next. On the Select Synchronization Type page, select the synchronization type for the current data synchronization project.
The supported synchronization types include Full Synchronization and Incremental Synchronization. The only option for Incremental Synchronization is DML Synchronization. Options for DML Synchronization are
Insert,Delete, andUpdate. You can select the operations based on your business requirements. For more information, see DML filtering.(Optional) Click Next.
If you have selected Incremental Synchronization without configuring the required parameters for the source OceanBase database, the More About Data Sources dialog box appears to prompt you to configure the parameters. For more information about the parameters, see Create a physical OceanBase data source or Create a public cloud OceanBase data source.
After you configure the parameters, click Test Connection. After the test succeeds, click OK.
Click Next. On the Select Synchronization Objects page, select a synchronization scope.
When you synchronize data from an OceanBase database to a RocketMQ instance, you can select multiple tables to map to multiple topics.
In the left-side pane, select the objects to be synchronized.
Click >.
Click the Existing Topics drop-down list in the Map the Object to the Topic dialog box and select the target topic.
You can also enter the name of an existing topic and select it after it appears.
Click OK.
Note
OMS automatically filters out unsupported tables.
OMS allows you to import objects from text files, change the topics of the objects, set row-based filtering, or remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.
Operation Description Import objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK. Notice
This operation will overwrite previous selections. Proceed with caution. - In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filtering conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation is passed, click OK.
Change topics OMS allows you to change the topic for objects at the destination. For more information, see Change topics. Configure settings OMS allows you to configure row-based filtering, select sharding columns, and select columns to be synchronized. - In the list on the right, move the pointer over the object that you want to set.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary key as sharding columns. If the primary keys are not load-balanced, select load-balanced fields with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the destination table supports concurrent writes.
- Orderliness: OMS ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
- In the Row Filters section, specify a standard SQL
- Click OK.
Remove one or all objects During data mapping, OMS allows you to remove one or more selected objects to be migrated or synchronized to the destination. - Remove a single synchronization object
In the list on the right of the selection section, hover over the target object, and click Remove. The synchronization object is removed. - Remove all synchronization objects
In the list on the right of the selection section, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, specify the following parameters.
Full synchronization
The following table describes the full synchronization parameters, which are displayed only if you have selected Full Synchronization on the Select Synchronization Type page.
Parameter Description Full Synchronization Resource Configuration You can select Small, Medium, or Large to use the corresponding default values of Read Concurrency, Write Concurrency, and Memory. You can also customize the resource configurations for full synchronization. Through resource configuration for the Full-Import component, you can limit the resource consumption of a project in the full synchronization phase. Notice
In the case of custom configurations, the minimum value is 1 and only integers are supported.
Incremental synchronization
The following table describes the incremental synchronization parameters, which are displayed only if you have selected Incremental Synchronization on the Select Synchronization Type page.
Parameter Description Incremental Log Pull Resource Configuration You can select Small, Medium, or Large to use the corresponding default value of Memory. You can also customize the resource configurations for incremental log pull. Through resource configuration for the Store component, you can limit the resource consumption of a project in log pull in the incremental synchronization phase. Notice
In the case of custom configurations, the minimum value is 1 and only integers are supported.
Incremental Data Write Resource Configuration You can select Small, Medium, or Large to use the corresponding default values of Write Concurrency and Memory. You can also customize the resource configurations for incremental data write. Through resource configuration for the Incr-Sync component, you can limit the resource consumption of a project in data writes in the incremental synchronization phase. Notice
In the case of custom configurations, the minimum value is 1 and only integers are supported.
Incremental Record Retention Time The duration that incremental parsed files are cached in OMS. A longer retention period results in more disk space occupied by the Store component. Incremental Synchronization Start Timestamp - If you have selected Full Synchronization as the synchronization type, the default value of this parameter is the project startup time and cannot be modified.
- If you do not select Full Synchronization as the synchronization type, set this parameter to a certain point of time, which is the current system time by default. For more information, see Set an incremental synchronization timestamp.
Advanced options
Parameter Description Serialization Method The message format for synchronizing data to a RocketMQ instance. Valid values: Default, Canal, Dataworks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, and DebeziumSmt. For more information, see Data formats.
Notice
At present, only MySQL tenants of OceanBase Database support Debezium, DebeziumFlatten, and DebeziumSmt.Partitioning Rules When you synchronize data to a RocketMQ instance, only Hash is supported.
Hash indicates that OMS uses a hash algorithm to select the message queue (MQ) for RocketMQ based on the value of the primary key or sharding column.Business System Identification (Optional) Identifies the source business system of data. The business system identifier consists of 1 to 20 characters.
If the parameter settings on the page cannot meet your requirements, you can click Parameter Configuration in the lower part of the page to configure more specific settings. You can also reference an existing project or component template.
Click Precheck.
During the precheck, OMS detects the connection with the destination data source. If an error is returned during the precheck, you can perform the following operations:
Identify and troubleshoot the problem and then perform the precheck again.
Click Skip in the Actions column of a failed precheck item. In the dialog box that appears, you can view the prompt for the consequences of the operation and click OK.
Click Start Project. If you do not need to start the project now, click Save to go to the details page of the data synchronization project. You can start the project later as needed.
OMS allows you to modify the synchronization objects when the data synchronization project is running. For more information, see View and modify synchronization objects. After a data synchronization project is started, the synchronization objects will be executed based on the selected synchronization type. For more information, see the "View synchronization details" section in the View details of a data synchronization project topic.
If the data synchronization project encounters a running exception due to a network failure or slow startup of processes, you can click Recover on the Synchronization Projects page or on the Details page of the synchronization project.