Kafka is a widely used high-performance distributed stream computing platform. OceanBase Migration Service (OMS) supports real-time data synchronization between a self-managed Kafka instance and Oracle and MySQL tenants of OceanBase Database, extending the message processing capability. Therefore, OMS is widely applied to business scenarios such as real-time data warehouse building, data query, and report distribution.
OMS allows you to synchronize data to message queue products, extending the all-around application of your business in big data fields, such as monitoring data aggregation, streaming data processing, and online/offline analysis. For more information about the data formats for the two types of tenants, see Data formats.
Prerequisites
You have created a dedicated database user for data synchronization in the source OceanBase database, and granted corresponding privileges to the users. For more information, see Create a database user.
You have created data sources for the source and destination. For more information, see Create a physical OceanBase data source and Create a Kafka data source.
Limits
Only physical tables can be synchronized.
OMS supports Kafka 0.9, 1.0, 2.x, and 3.2.1.
Notice
If the Kafka version is 0.9, schema synchronization is not supported.
During data synchronization, if you rename a source table to be synchronized and the new name is beyond the synchronization scope, the data of the source table will not be synchronized to the destination Kafka instance.
The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
Data source identifiers, user accounts, and tags must be globally unique in OMS.
OMS supports only the synchronization of objects whose database name or table name is an ASCII string without special characters. The special characters are . | \ " ' ` ( ) = ; / & \n
Usage notes
If the clocks are not synchronized between the nodes or between the client and a server, the latency of incremental synchronization may be negative.
When data transmission is resumed for a project, some data (transmitted within the last minute) may be duplicated in the Kafka instance. Therefore, data deduplication is required in downstream applications.
We recommend that you select no more than 15,000 database objects for a project.
If a table contains LOB fields or contains more than 500 columns, we recommend that you create a dedicated project for the table and set the JVM parameters of related components as needed. For example, set the
limitator.select.batch.maxparameter for the full verification component, thesourceBatchSizeparameter for the full import component, and thesourceBatchSizeparameter for the incremental synchronization component.Execute the following statement to query tables that contain LOB fields:
SELECT DISTINCT(TABLE_NAME) FROM ALL_TAB_COLUMNS WHERE DATA_TYPE IN ('BLOB', 'CLOB', 'NCLOB') AND OWNER = XXX;.When you synchronize data from OceanBase Database to a Kafka instance, if the execution of the statement for creating a unique index fails in the source database, the Kafka instance will consume the DDL creation and DDL deletion statements. If the execution of the CREATE INDEX DDL statement passed to the downstream application fails, ignore this exception.
Notice
Liboblog V2.2.x does not ensure the execution sequence of DDL/DML statements. This may affect the data quality.
Supported DDL operations for synchronization
CREATE TABLENotice
The created table must be in the range of synchronization objects. You can execute the
CREATE TABLEstatement only after you execute theDROP TABLEstatement on synchronized tables.ALTER TABLEDROP TABLETRUNCATE TABLEIn delayed deletion, two
TRUNCATE TABLEDDL statements exist in the same transaction. In this case, downstream applications execute these DDL statements in an idempotent manner.ALTER TABLE…TRUNCATE PARTITIONCREATE INDEXDROP INDEXCOMMENT ON TABLERENAME TABLENotice
The new name of the renamed table must match the rules for objects to be synchronized.
Procedure
Create a data synchronization project.
Log on to the OMS console.
In the left-side navigation pane, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Project in the upper-right corner.
On the Select Source and Destination page, specify the following parameters.
Parameter Description Synchronization Project Name We recommend that you set it to a combination of digits and letters. It must not contain any spaces and cannot exceed 64 characters in length. Labels Click the field and select a target tag from the drop-down list. You can click Manage Tags to create, modify, and delete tags. For more information, see Manage data synchronization projects by using tags. Source If you have created a physical OceanBase data source, select it from the drop-down list. Otherwise, click Add Data Source in the drop-down list to create one in the dialog box on the right side. For more information, see Create a physical data source of OceanBase Database.
Notice
The source database cannot be an instance of OceanBase Database V4.0.0.Destination If you have created a Kafka data source, select it from the drop-down list. Otherwise, click Add Data Source in the drop-down list to create one in the dialog box on the right side. For more information, see Create a Kafka data source. Click Next. On the Select Synchronization Type page, select the synchronization type for the current data synchronization project.
Valid values of Synchronization Type are Schema Synchronization, Full Synchronization, and Incremental Synchronization. Full Synchronization supports the synchronization of tables without primary keys. Options for Incremental Synchronization are DML Synchronization and DDL Synchronization. The supported DML operations are
Insert,Delete, andUpdate. For more information, see DML filtering and Synchronize DDL operations.(Optional) Click Next.
If the source database is an OceanBase database, you must configure the obconfig_url parameter, username, and password for incremental synchronization.
If you have selected Incremental Synchronization without configuring the required parameters for the source database, the More About Data Sources dialog box appears to prompt you to configure the parameters. For more information, see Create a physical OceanBase data source.
After you configure the parameters, click Test Connectivity. After the test succeeds, click Save.
Click Next. On the Select Synchronization Objects page, select a synchronization scope.
When you synchronize data from an OceanBase database to a Kafka instance, you can synchronize data from multiple tables to multiple topics.
In the left-side pane, select the objects to be synchronized.
Click >.
In the Map Object to Topic dialog box, select a mapping method.
If you do not select Schema Synchronization when you set the synchronization type, you can select only Existing Topics here. If you selected Schema Synchronization when you select the synchronization type, you can select only one mapping method to create or select topics.
For example, if you selected Schema Synchronization, when you use both the Create Topic and Select Topic mapping methods or rename the topic, a precheck error will be returned due to option conflicts.
Parameter Description Create Topic Enter the name of the new topic in the text box. The topic name contains 3 to 64 characters, and can contain only letters, digits, hyphens (-), underscores (_), and periods (.). Select Topic OMS allows you to query Kafka topics. You can click Select Topic, and then find and select the topics to be synchronized from the Select Topic drop-down list. You can also enter the name of an existing topic and select it after it appears. Batch Generate Topics The format for generating topics in batches is: Topic_${Database Name}_${Table Name}.If you select Create Topic or Batch Generate Topics, after schema synchronization succeeds, you can query the created topics on the Kafka side. By default, the number of partitions is 3 and the number of partition replicas is 1. These parameters cannot be modified. If the topics do not meet your business needs, you can create topics in the destination database as needed.
Click OK.
OMS allows you to import objects from text files, change the topics of the objects, set row-based filtering, or remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.
Actions Steps Import Objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK. Notice
This operation will overwrite previous selections. Proceed with caution. - In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filtering conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation succeeds, click OK.
Change Topic OMS allows you to change topics. For more information, see Change a topic. Parameter OMS allows you to configure row-based filtering, select sharding columns, and select columns to be synchronized. - In the list on the right, move the pointer over the target object.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary keys as sharding columns. If the primary keys are not load-balanced, select fields with unique identifiers and whose load is balanced as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the destination table supports concurrent writes.
- Orderliness: OMS ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Row Filters section, specify a standard SQL
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
- Click OK.
Remove/Remove All During data mapping, OMS allows you to remove one or more selected objects to be migrated to the destination. - Remove a single synchronization object
In the list on the right of the selection section, move the pointer over the target object, and click Remove. The synchronization object is removed. - Remove all synchronization objects
In the list on the right of the selection section, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, specify the following parameters.
Parameter Description Incremental Synchronization Start Timestamp - If you have selected Full Synchronization when you set the synchronization type, here the value is the project start time by default and cannot be modified.
- If you do not select Full Synchronization when you set the synchronization type, specify a point in time after which the data will be synchronized. The default value is the current system time. You can select a point in time or enter a timestamp.
Notice
You can select the current time or a point in time earlier than the current time. This parameter is closely related to the retention period of archived logs. Generally, you can start data synchronization from the current timestamp.
Serialization Method The message format for synchronizing data to a Kafka instance. Valid values: Default, Canal, Dataworks (version 2.0 supported), SharePlex, DefaultExtendColumnType, and Debezium. For more information, see Data formats.
Notice:- At present, MySQL tenants of OceanBase Database supports only Debezium.
- If you select Dataworks, the
COMMENT ON TABLEandALTER TABLE…TRUNCATE PARTITIONDDL operations cannot be synchronized.
Enable Intra-Transaction Sequence Specifies whether to maintain order within a transaction. If this feature is enabled, OMS marks the sequence number for a transaction to be sent to a downstream node.
Notice
This parameter is valid only for the SharePlex format and is intended for you to obtain the sequence numbers of the DML statements that form a transaction.
For example, if a transaction contains 10 DML statements numbered from 1 to 10, OMS will deliver data to the destination database in the same order.Partitioning Rule This parameter specifies the rule of synchronizing data from an OceanBase database to a Kafka topic. OMS supports Hash, Table, and One. - Hash indicates that OMS uses a hash algorithm to select the partition of a Kafka topic based on the value of the primary key or sharding column.
Notice
The Hash rule supports only delivering data to all partitions. - Table indicates that OMS delivers all data in a table to the same partition and uses the table name as the hash key.
- One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.
Business System Identification (Optional) Identifies the source business system of data. The business system identifier consists of 1 to 20 characters. Click Precheck.
During the precheck, OMS detects the connection with the destination Kafka instance. If an error is returned during the precheck:
You can identify and troubleshoot the issue and then perform the precheck again.
You can click Skip in the Actions column of the precheck item with the error. A dialog box will be displayed, prompting the impact caused if you skip this error. If you want to continue, click OK in the dialog box.
Click Start Project. If you do not need to start the project now, click Save to go to the details page of the data synchronization project. You can start the project later as needed.
OMS allows you to modify the synchronization objects when the data synchronization project is running. For more information, see View and modify synchronization objects. After a data synchronization project is started, the synchronization objects will be executed based on the selected synchronization type. For more information, see the "View synchronization details" section in the View details of a data synchronization project topic.
If data access fails due to a network failure or the slow startup of processes, go to the project list or the project details page and click Restore.