Message Queue for Apache RocketMQ is a distributed message-oriented middleware built by Alibaba Cloud based on Apache RocketMQ. It features low latency, high concurrency, and high reliability. The data synchronization feature of OceanBase Migration Service (OMS) Community Edition allows you to synchronize data between a physical table in OceanBase Database Community Edition and a RocketMQ data source in real time, extending the message processing capability.
For more information about the data formats for OceanBase Database Community Edition, see Data formats.
Prerequisites
You have created a dedicated database user for data synchronization in the source OceanBase Database Community Edition and granted the required privileges to the user. For more information, see Create a database user.
Limitations
OMS Community Edition supports instances of RocketMQ 4.x or 5.x, including RocketMQ Enterprise Edition and RocketMQ Community Edition.
Only physical tables can be synchronized.
During data synchronization, OMS Community Edition allows you to drop a table before creating a new one. In other words, you can execute
DROP TABLEand then executeCREATE TABLE. In OMS Community Edition, you cannot create a new table by renaming a table. In other words, you cannot executeRENAME TABLE a TO a_tmp.The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
The data source identifiers and user accounts must be globally unique in OMS Community Edition.
OMS Community Edition supports the synchronization of only objects whose database name, table name, and column name are ASCII-encoded without special characters. The special characters are line breaks and
| " ' ` ( ) = ; / &.The source cannot be a standby OceanBase database.
Considerations
To ensure the performance of a data synchronization task, we recommend that you synchronize no more than 1,000 tables at a time.
If the source is of an OceanBase Database Community Edition version in the range of V4.0.0 to V4.3.X, excluding V4.2.5 BP1, and you have selected Incremental Synchronization, you need to configure the
STOREDattribute for generated columns. For more information, see Generated column operations. Otherwise, information about generated columns will not be saved in incremental logs, which may lead to exceptions during incremental data synchronization.Take note of the following items when an updated row contains a LOB column:
If the LOB column is updated, do not use the value stored in the LOB column before the
UPDATEorDELETEoperation.The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.
If the LOB column is not updated, the value stored in the LOB column before and after the
UPDATEorDELETEoperation is NULL.
When data transfer is resumed for a task, some data (within the last minute) may be duplicated in the RocketMQ instance. Therefore, deduplication is required in downstream systems.
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
If the
binlog_row_imagevalue is notFULLwhen the application starts, you can set it toFULL. After that, you must restart the application. Otherwise, OceanBase Community Edition will lack log information, which leads to issues with data synchronization. The command for setting the value is as follows:set global binlog_row_image = 'FULL';
Procedure
Create a data synchronization task.
Log in to the console of OMS Community Edition.
In the left navigation bar, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Task in the upper right corner.
On the Select Source and Target page, configure the parameters.
Parameter Description Task Name We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length. Label Click the text box and select the target label from the drop-down list. You can also click Manage Tags to create, modify, and delete labels. For more information, see Manage data synchronization tasks using labels. Source If you have already created an OceanBase-CE data source, please select it from the drop-down list. If not, click New Data Source in the drop-down list and add it in the dialog box on the right. For parameter details, see Create an OceanBase-CE Data Source. Target If you have created a RocketMQ data source, select it from the drop-down list. Otherwise, click New Data Source in the drop-down list and create one in the dialog box that appears on the right. For more information, see Create a RocketMQ data source. Click Next Step on the Select Synchronization Type page to choose the synchronization type for the current data synchronization task.
The synchronization type supports Full Synchronization and Incremental Synchronization, while Incremental Synchronization only supports Synchronizing DML (including
Insert,Delete, andUpdate). Please select based on your business needs. For more details, see DML Filtering.(Optional) Click Next.
When the source is OceanBase Community Edition, incremental synchronization requires configuring OCP (optional), DRC User Username, and Password.
If you select Incremental Synchronization but the source OceanBase Community Edition is not configured with the corresponding parameters, a dialog box named Data Source Supplementary Information will pop up to remind you to configure it. For details about the parameters, see Create a New OceanBase-CE Data Source.
After the supplement is completed, click Test connectivity. After the connection test is successful, click Save.
Click Next Step to go to the Select Synchronization Object page, and select the synchronization scope.
You can select Specify Objects or Match Rules to specify the synchronization objects. The following procedure describes how to specify synchronization objects by using the Specify Objects option. For information about the procedure for specifying synchronization objects by using the Match Rules option, see Configure matching rules for synchronization objects.
When you synchronize data from OceanBase Database Community Edition to a RocketMQ instance, you can synchronize data from multiple tables to multiple topics.
In the left-side pane, select the objects to be synchronized.
Click >.
Click the Existing Topics drop-down list in the Map Object to Topic dialog box and select the target topic.
You can also enter the name of an existing topic and select it after it appears.
Click OK.
When you synchronize data from OceanBase Database Community Edition to a RocketMQ instance, OMS Community Edition allows you to import objects from text and perform the following operations on the objects in the target database: change topics, set row filtering conditions, and remove a single object or all objects. Objects in the target database are listed in the structure of Topic > Database > Table.
Operation Description Import objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK.
Notice:
This operation will overwrite previous selections. Proceed with caution. - In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filtering conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation succeeds, click OK.
Change topics OMS Community Edition allows you to change the topic for objects in the target database. For more information, see Change the topic. Configure settings OMS Community Edition allows you to configure row-based filtering, select sharding columns, and select columns to be synchronized. - In the list on the right, move the pointer over the object that you want to set.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple columns as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary key as sharding columns. If the primary key is not load-balanced, select load-balanced columns with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the target table supports concurrent writes.
- Orderliness: OMS Community Edition ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
- In the Row Filters section, specify a standard SQL
- Click OK.
Remove one or all objects OMS Community Edition allows you to remove a single object or all objects to be synchronized to the target database during data mapping. - Remove a single synchronization object
In the list on the right, move the pointer over the object that you want to remove, and click Remove to remove the synchronization object. - Remove all synchronization objects
In the list on the right, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, configure the following parameters.
To view or modify parameters of the Connector or Incr-Sync component, click Configuration Details in the upper-right corner of the Full synchronization or Incremental Synchronization section. For more information about the parameters, see Component parameters.
Full synchronization
The following parameters are displayed only if you have selected Full Synchronization on the Select Synchronization Type page.
Parameter Description Concurrent Speed Includes Stable, Normal, Fast, and Custom. The performance of full synchronization varies, and the resources required for full synchronization tasks also differ. When you select Custom, you can set Read Concurrency, Write Concurrency, and JVM Memory based on actual conditions. Incremental synchronization
The following parameters are displayed only if you have selected Incremental Synchronization on the Select Synchronization Type page.
Parameter Description Concurrency Speed Valid values: Stable, Normal, Fast, and Custom. The amount of resources to be consumed by an incremental synchronization task depends on the synchronization performance. If you select Custom, you can set Read Concurrency, Write Concurrency, and JVM Memory as needed. Incremental Synchronization Start Timestamp - If you have selected Full Synchronization as the synchronization type, this parameter is not displayed.
- If you did not select Full Synchronization as the synchronization type, set this parameter to a certain point in time, which is the current system time by default. For more information, see Set an incremental synchronization timestamp.
Advanced options
Parameter Description Serialization Method The message format for synchronizing data to a RocketMQ instance. Valid values: Default, Canal, Dataworks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, and DebeziumSmt. For more information, see Data formats.
Notice
At present, only MySQL-compatible tenants of OceanBase Database support Debezium, DebeziumFlatten, and DebeziumSmt.Partitioning Rules The rule for synchronizing data from the source database to a RocketMQ instance. At present, it can be set only to Hash.
Hash indicates that OMS Community Edition uses a hash algorithm to select the message queue for RocketMQ based on the value of the primary key or sharding column.Business System Identification (Optional) The identifier that identifies the source business system of data. The business system identifier consists of 1 to 20 characters. Target
Parameter Description Enter the name of the producer group Specifies a producer group that can write data into multiple topics. Whether to Allow Message Tracing Specifies whether to allow message tracing. If message tracing is enabled, you can trace the complete task information, including the node time and status, of a message in its entire lifecycle from transmission to the RocketMQ server by the producer to consumption by consumers. The message tracing feature provides data support for troubleshooting in the production environment.
Click Pre-check.
During the precheck, OMS Community Edition detects the connection with the target. If an error is returned during the precheck, you can perform the following operations:
Identify and troubleshoot the problem and then perform the precheck again.
Click Skip in the Actions column of the failed precheck item. In the dialog box that prompts the consequences of the operation, click OK.
Click Start Task. If you do not need to start the task now, click Save to go to the details page of the task. You can start the task later as needed.
OMS Community Edition allows you to modify the synchronization objects when the data synchronization task is running. For more information, see View and modify synchronization objects. After the data synchronization task is started, it will be executed based on the selected synchronization types. For more information, see the View Synchronization Details section in the View details of a data synchronization task topic.
If the data synchronization task encounters an execution exception due to a network failure or slow startup of processes, you can click Resume on the Synchronization Tasks or Details page of the synchronization task.