Kafka is a widely used high-performance distributed stream computing platform. OceanBase Migration Service (OMS) Community Edition supports real-time data synchronization between OceanBase Database Community Edition and a self-managed Kafka instance, extending the message processing capability. Therefore, the data transmission feature is widely applied to business scenarios such as real-time data warehouse building, data query, and report distribution.
OMS Community Edition enables you to synchronize data to message queue products, extending the all-around application of your business in big data fields, such as data aggregation monitoring, streaming data processing, and online/offline analysis. For more information about the data formats for OceanBase Database Community Edition, see Data formats.
Prerequisites
You have created dedicated database users for data synchronization in the source OceanBase database of Community Edition and granted corresponding privileges to the users. For more information, see Create a database user.
Limitations
Only physical tables can be synchronized.
OMS Community Edition supports Kafka 0.9, 1.0, and 2.x.
Notice
If the Kafka version is 0.9, schema synchronization is not supported.
During data synchronization, if you rename a source table to be synchronized and the new name is beyond the synchronization scope, the data of the source table will not be synchronized to the destination Kafka instance.
The name of a table to be synchronized, as well as the names of columns in the table, must not contain Chinese characters.
The data source identifiers and user accounts must be globally unique in OMS Community Edition.
OMS Community Edition supports the synchronization of only objects whose database name, table name, and column name are ASCII-encoded without special characters. The special characters are line breaks and . | " ' ` ( ) = ; / &.
OMS Community Edition does not support a standby OceanBase database as the source.
Considerations
In a data synchronization project where the source is of the OceanBase Database Community Edition type and DDL synchronization is enabled, if a
RENAMEoperation is performed on a table in the source, we recommend that you restart the project to avoid data loss during incremental synchronization.Consider the following circumstances when the updated row includes a LOB column:
If the LOB column is updated, do not use values of the LOB column before the
UPDATEorDELETEoperation.LOB columns can be used to store the following types of data: JSON, GIS, XML, UDT, LONGTEXT, and MEDIUMTEXT.
If the LOB column is not updated, the values of the LOB column before and after the
UPDATEorDELETEoperation are NULL.
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
When data transfer is resumed for a project, some data (within the last minute) may be duplicated in the Kafka instance, and deduplication is required in downstream systems.
When you synchronize data from OceanBase Database Community Edition to a Kafka instance, if the statement for unique index creation fails the execution at the source, the Kafka instance consumes the DDL statements for unique index creation and deletion. If the downstream DDL statement for unique index creation fails the execution, ignore this exception.
Notice
Liboblog 2.2.x does not guarantee the order of DDL or DML statements and may cause data quality issues.
Supported DDL operations
CREATE TABLENotice
The created table must be a synchronization object. To execute the
CREATE TABLEstatement on a synchronized table, execute theDROP TABLEstatement on this table first.ALTER TABLEDROP TABLETRUNCATE TABLEIn delayed deletion, the same transaction contains two identical
TRUNCATE TABLEDDL statements. In this case, idempotence is implemented for downstream consumption.ALTER TABLE…TRUNCATE PARTITIONCREATE INDEXDROP INDEXCOMMENT ON TABLERENAME TABLENotice
The renamed table must be a synchronization object.
Procedure
Create a data synchronization project.
Log on to the console of OMS Community Edition.
In the left-side navigation pane, click Data Synchronization.
On the Data Synchronization page, click Create Synchronization Project in the upper-right corner.
On the Select Source and Destination page, configure the parameters.
Parameter Description Synchronization Project Name The name of the migration project. We recommend that you set it to a combination of digits and letters. The name cannot contain spaces or exceed 64 characters in length. Tag (Optional) Click the field and select a target tag from the drop-down list. You can also click Manage Tags to create, modify, and delete tags. For more information, see Manage data synchronization projects by using tags. Source If you have created a data source of OceanBase Database Community Edition, select it from the drop-down list. Otherwise, click Add Data Source in the drop-down list to create one in the dialog box on the right side. For more information about the parameters, see Create a data source of OceanBase Database Community Edition. Destination If you have created a Kafka data source, select it from the drop-down list. Otherwise, click Add Data Source in the drop-down list to create one in the dialog box on the right side. For more information, see Create a Kafka data source. Click Next. On the Select Synchronization Type page, select the synchronization type for the current data synchronization project.
Valid values: Schema Synchronization, Full Synchronization, and Incremental Synchronization. Full Synchronization supports the synchronization of tables without primary keys. Incremental Synchronization supports DML Synchronization and DDL Synchronization.
Optional. Click Next.
To synchronize data from OceanBase Database Community Edition, you must specify OCP (Optional), Username, and Password for schema migration and incremental synchronization.
If you select Schema Migration and Incremental Synchronization without configuring the required parameters for the source database, the More About Data Sources dialog box appears to prompt you to configure the parameters. For more information about the parameters, see Create a data source of OceanBase Database Community Edition.
After you configure the parameters, click Test Connectivity. After the test succeeds, click Save.
Click Next. On the Select Synchronization Objects page, select a synchronization scope.
When you synchronize data from OceanBase Database Community Edition to a Kafka instance, you can synchronize data from multiple tables to multiple topics.
In the left-side pane, select the objects to be synchronized.
Click >.
In the Map Object to Topic dialog box, select a mapping method.
If you did not select Schema Synchronization when you set the synchronization type, you can select only Existing Topics here. If you select Schema Synchronization when you set the synchronization type, you can select only one mapping method to create or select a topic.
For example, if you have selected Schema Synchronization, when you use both the Create Topic and Select Topic mapping methods or rename the topic, a precheck error will be returned due to option conflicts.
Parameter Description Create Topic Enter the name of the new topic in the text box. The topic name contains 3 to 64 characters, and can contain only letters, digits, hyphens (-), underscores (_), and periods (.). Select Topic OMS Community Edition allows you to query Kafka topics. You can click Select Topic, and then find and select the topics to be synchronized from the Existing Topics drop-down list. You can also enter the name of an existing topic and select it after it appears. Batch Generate Topics The format for generating topics in batches is Topic_${Database Name}_${Table Name}.If you select Create Topic or Batch Generate Topics, you can query the newly created topics in the Kafka instance after schema synchronization is completed. By default, each Kafka topic has three partitions, and each partition has one replica. The default settings cannot be modified. If the topics do not meet your business needs, you can create topics in the destination database as needed.
Click OK.
When you synchronize data from OceanBase Database Community Edition to a Kafka instance, OMS Community Edition allows you to import objects from text and perform the following operations on the objects in the destination database: change topics, set row filtering conditions, and remove a single object or all objects. Objects in the destination database are listed in the structure of Topic > Database > Table.
Operation Steps Import Objects - In the list on the right, click Import Objects in the upper-right corner.
- In the dialog box that appears, click OK. Notice
This operation will overwrite previous selections. Proceed with caution. - In the Import Synchronization Objects dialog box, import the objects to be synchronized.
You can import CSV files to rename databases/tables and set row filtering conditions. For more information, see Download and import the settings of synchronization objects. - Click Validate.
- After the validation succeeds, click OK.
Change Topic OMS Community Edition allows you to change topics for objects in the destination database. For more information, see Change topics. Settings OMS Community Edition allows you to configure row-based filtering, select sharding columns, and select columns to be synchronized. - In the list on the right, move the pointer over the object that you want to set.
- Click Settings.
- In the Settings dialog box, you can perform the following operations:
- In the Row Filters section, specify a standard SQL
WHEREclause to filter data by row. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary keys as sharding columns. If the primary keys are not load-balanced, select load-balanced fields with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the destination table supports concurrent writes.
- Orderliness: OMS Community Edition ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
- In the Row Filters section, specify a standard SQL
- Click OK.
Remove/Remove All OMS Community Edition allows you to remove a single object or all objects to be migrated to the destination database during data mapping. - Remove a single synchronization object
In the list on the right of the selection section, move the pointer over the target object, and click Remove. The synchronization object is removed. - Remove all synchronization objects
In the list on the right of the selection section, click Remove All in the upper-right corner. In the dialog box that appears, click OK to remove all synchronization objects.
Click Next. On the Synchronization Options page, specify the following parameters.
Parameter Description Incremental Synchronization Start Timestamp - If you select Full Synchronization as the synchronization type, the default value of this parameter is the project startup time and cannot be modified.
- If you do not select Full Synchronization as the synchronization type, set this parameter to a certain point of time, which is the current system time by default. You can select a point in time or enter a timestamp.
Notice
You can select the current time or a point in time earlier than the current time. This parameter is closely related to the retention period of archived logs. Generally, you can start data synchronization from the current timestamp.
Serialization Method The message format for synchronizing data to the destination Kafka instance. Valid values: Default, Canal, Dataworks (V2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, Maxwell, and DebeziumSmt. For more information, see Data formats.
Notice- Only MySQL tenants of OceanBase Database support the Debezium, DebeziumFlatten, and DebeziumSmt formats.
- If the message format is set to Dataworks, DDL operations
COMMENT ON TABLEandALTER TABLE…TRUNCATE PARTITIONcannot be synchronized.
Partitioning Rules The rule for synchronizing data from an OceanBase database to a Kafka topic. The data transmission feature supports Hash, Table, and One. - Hash indicates that OMS Community Edition uses a hash algorithm to select the partition of a Kafka topic based on a hashed value of the value of the primary key or sharding column.
Notice
The Hash rule supports only delivering data to all partitions. - Table indicates that OMS Community Edition delivers all data in a table to the same partition and uses the table name as the hash key.
- One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.
Business System Identification (Optional) Identifies the source business system of data. The business system identifier consists of 1 to 20 characters. Click Precheck.
During the precheck, OMS Community Edition detects the connection with the destination Kafka instance. If an error is returned during the precheck:
You can identify and troubleshoot the problem and then perform the precheck again.
You can also click Skip in the Actions column of the failed precheck item. A dialog box appears, prompting you the impact. If you want to skip this operation, click OK.
Click Start Project. If you do not need to start the project now, click Save to go to the details page of the data synchronization project. You can start the project later as needed.
OMS Community Edition allows you to modify the synchronization objects during a synchronization project. For more information, see View and modify synchronization objects. After a data synchronization project is started, the synchronization objects will be executed based on the selected synchronization type. For more information, see the "View synchronization details" section in the View details of a data synchronization project topic.
If the data synchronization project encounters a running exception due to a network failure or slow startup of processes, you can click Recover on the Synchronization Projects page or on the Details page of the synchronization project.