This topic describes how to use the data migration service to migrate data from an OceanBase database in MySQL or Oracle compatible mode to a Kafka instance.
Notice
A data migration task remaining in an inactive state for a long time may fail to be resumed depending on the retention period of incremental logs. Inactive states are Failed, Stopped, and Completed. The data migration service automatically releases data migration tasks that remain in an inactive state for more than 7 days to recycle resources. We recommend that you configure alerting for tasks and handle task exceptions in a timely manner.
Background information
Kafka is a widely used high-performance distributed stream computing platform. The data migration service supports real-time data synchronization between an OceanBase database in MySQL or Oracle compatible mode and a Kafka data source. This feature extends the message processing capability and is widely applied to business scenarios such as real-time data warehouse building, data query, and report distribution.
Kafka network connection tasks include cloud instance Kafka network and self-managed Kafka network. The process varies depending on the type of network connection task.
Cloud instance Kafka network connection task

Self-managed Kafka network connection task

Prerequisites
You have created the source OceanBase instance and tenant. For more information, see Create an instance and Create a tenant.
You have created dedicated database users for data migration in the source OceanBase database in MySQL compatible mode and the target Kafka instance, and granted required privileges to the users. For more information, see User privileges.
Limitations
Only users with the Project Owner, Project Admin or Data Services Admin roles are allowed to create data migration tasks.
Limitations on the source database:
Do not perform DDL operations that modify database or table schemas during full migration. Otherwise, the data migration task may be interrupted.
The data migration service supports Kafka 0.9, 1.0, and 2.x.
The data migration service supports only objects whose database name, table name, and column name are ASCII-encoded and do not contain special characters. The special characters are line breaks and
. | " ' ` ( ) = ; / &.Only physical tables can be migrated.
During data migration, if you rename a source table to be migrated and the new name is beyond the migration scope, the data of the source table will not be migrated to the target Kafka instance.
Considerations
If the clocks between nodes or between the client and the server are out of synchronization, the latency may be inaccurate during incremental synchronization.
For example, if the clock is earlier than the standard time, the latency can be negative. If the clock is later than the standard time, the latency can be positive.
In a data migration task where the source is an OceanBase database and DDL synchronization is enabled, if a
RENAMEoperation is performed on a table in the source, we recommend that you restart the task to avoid data loss during incremental synchronization.If the version of the source OceanBase database is from V4.0.0 to V4.3.x, excluding V4.2.5 BP1, and you have selected Incremental Synchronization, you need to configure the
STOREDattribute for generated columns. For more information, see Generated column operations. Otherwise, information about generated columns will not be saved in incremental logs, which may lead to exceptions during incremental synchronization.Take note of the following considerations when an updated row contains a large object (LOB) column:
If the LOB column is updated, do not use the value stored in the LOB column before the UPDATE or DELETE operation.
The following data types are stored in LOB columns: JSON, GIS, XML, user-defined type (UDT), and TEXT such as LONGTEXT and MEDIUMTEXT.
If the LOB column is not updated, the value stored in the LOB column before and after the UPDATE or DELETE operation is NULL.
When data transmission is resumed for a task, some data (transmitted within the last minute) may be duplicated in the Kafka instance. Therefore, deduplication is required in downstream systems.
When you synchronize data from an OceanBase database to a Kafka instance, if the source fails to execute the statement for unique index creation, the Kafka instance consumes the DDL statements for unique index creation and deletion. If the downstream systems fail to execute the DDL statement for unique index creation, ignore this exception.
If you select only Incremental Synchronization when you create the data migration task, the data migration service requires that the local incremental logs in the source database be retained for more than 48 hours.
If you select Full Migration and Incremental Synchronization when you create the data migration task, the data migration service requires that the local incremental logs in the source database be retained for at least 7 days. Otherwise, the data migration task may fail or the data in the source and target databases may be inconsistent because the data migration service cannot obtain incremental logs.
Supported source and target instance types
In the following table, the instance types supported for OceanBase Database in the MySQL and Oracle compatible modes are Dedicated (Transactional) and Self-managed Database.
| Cloud vendor | Source | Target |
|---|---|---|
| AWS | OceanBase MySQL Compatible Mode | Cloud Instance Kafka |
| AWS | OceanBase MySQL Compatible Mode | Self-managed Kafka |
| AWS | OceanBase Oracle Compatible Mode | Cloud Instance Kafka |
| AWS | OceanBase Oracle Compatible Mode | Self-managed Kafka |
| Huawei Cloud | OceanBase MySQL Compatible Mode | Cloud Instance Kafka |
| Huawei Cloud | OceanBase MySQL Compatible Mode | Self-managed Kafka |
| Huawei Cloud | OceanBase Oracle Compatible Mode | Cloud Instance Kafka |
| Huawei Cloud | OceanBase Oracle Compatible Mode | Self-managed Kafka |
| Google Cloud | OceanBase MySQL Compatible Mode | Cloud Instance Kafka |
| Google Cloud | OceanBase MySQL Compatible Mode | Self-managed Kafka |
| Google Cloud | OceanBase Oracle Compatible Mode | Cloud Instance Kafka |
| Google Cloud | OceanBase Oracle Compatible Mode | Self-managed Kafka |
| Alibaba Cloud | OceanBase MySQL Compatible Mode | Cloud Instance Kafka |
| Alibaba Cloud | OceanBase MySQL Compatible Mode | Self-managed Kafka |
| Alibaba Cloud | OceanBase Oracle Compatible Mode | Cloud Instance Kafka |
| Alibaba Cloud | OceanBase Oracle Compatible Mode | Self-managed Kafka |
Procedure
Create a data migration task.

Log in to the OceanBase Cloud console.
In the left-side navigation pane, choose Data Services > Migrations.
On the Migrations page, click the Migrate Data tab.
On the Migrate Data tab, click New Task in the upper-right corner.
On the Configure Source & Target page, configure the following parameters.
In the task name field, enter a custom migration task name.
We recommend that you use a combination of Chinese characters, numbers, and English letters. The name must not contain spaces and must be less than 64 characters in length.
Configure the following parameters in the Source Profile section.
If you want to reference an existing data source, click Quick Fill next to Source Profile and select the required data source from the drop-down list. Then, the parameters in the Source Profile section are automatically populated. To save the current configurations as a new data source, click the Save icon in the upper-right corner of the Source Profile section.
You can also select Manage Data Sources in the Quick Fill drop-down list to go to the Data Sources page, where you can view and manage different types of data sources in a unified manner. For more information, see Data source.
Parameter Description Cloud Vendor At present, AWS, Huawei Cloud, Google Cloud, and Alibaba Cloud are supported. Database Type Select OceanBase MySQL Compatible or OceanBase Oracle Compatible. Instance Type At present, Dedicated (Transactional) and Self-managed Database are supported. Region The region of the source database. Instance The ID or name of the OceanBase Database instance. You can view the ID or name of the target instance on the Instances page. Note
When the cloud vendor is Alibaba Cloud, you can also select a cross-account authorized instance of an Alibaba Cloud primary account. For more information, see Alibaba Cloud account authorization.
Tenant The ID or name of the OceanBase Database tenant in the OceanBase database. You can expand the information about the target instance on the Instances page and view the ID or name of the tenant. Database Account The username of the OceanBase database user for data migration. Password The password of the database user. When the source instance type is Self-managed Database, if you want to perform schema migration and incremental synchronization, you must configure the parameters in the Advanced Settings section.

If you want to select Schema Migration or Incremental Synchronization on the Select Type & Objects page, toggle on sys Tenant Account and configure the following parameters.
Parameter Description sys Account The name of a user in the sys tenant. The user reads incremental logs of OceanBase Database and obtains database object schemas. You must create the user in the sys tenant of your business cluster. Password The password of the user. If you want to select Incremental Synchronization on the Select Type & Objects page, toggle on OBLogProxy and specify OBLogProxy connection information.
Notice
Incremental synchronization is supported only when both sys Tenant Account and OBLogProxy are toggled on.
oblogproxy is a proxy service for managing incremental logs of OceanBase Database. It provides project intervention and management capabilities as services to allow applications to access and manage real-time incremental logs of OceanBase Database. It also addresses the need for incremental log subscriptions in network isolation mode. The value is in the format of
oblogproxy IP address:oblogproxy port number.
Configure the following parameters in the Target Profile section.
If you want to reference an existing data source, click Quick Fill next to Target Profile and select the required data source from the drop-down list. Then, the parameters in the Target Profile section are automatically populated. To save the current configurations as a new data source, click the Save icon in the upper-right corner of the Target Profile section.
You can also select Manage Data Sources in the Quick Fill drop-down list to go to the Data Sources page, where you can view and manage different types of data sources in a unified manner. For more information, see Manage data sources.
Parameter Description Cloud Vendor At present, AWS, Huawei Cloud, Google Cloud, and Alibaba Cloud are supported. You can select the same cloud vendor as the source. Data migration across different cloud vendors is also supported. Note
By default, cross-cloud vendor data migration is disabled. To use this feature, contact OceanBase Cloud Technical Support.
Database Type Select Kafka. Instance Type Cloud Instance Kafka and Self-managed Kafka are supported. Region The region of the target instance. Connection Type Endpoint and Public IP are supported. - If you select Endpoint as the connection type, you must add the authorized account ID displayed on the page to the allowlist of your endpoint service. This way, the endpoint of this account can connect to your endpoint service. For more information, see Connect via private network.
- If you set Cloud Vendor to AWS and select Acceptance required for Require acceptance for endpoint when you create your endpoint service, you are prompted to go to the Amazon Virtual Private Cloud (VPC) console and select Accept endpoint connection request to accept the connection request when the data migration service connects to PrivateLink for the first time.
- If you set Cloud Vendor to Google Cloud, add an authorized project to Published Services. After that, no manual authorization is required during a data source connection test.
- If you select Public IP as the connection type, you must first add the data source IP address displayed on the page to the allowlist of the Kafka instance to ensure connectivity. For more information, see Connect via public network.
Note
The data source IP address to be added to the allowlist is displayed only after you select the regions of the source and target.
Connection Details - When Connection Type is set to Endpoint, enter the name of the endpoint service and Kafka endpoint.
- When Connection Type is set to Public IP, enter the Kafka endpoint.
If security authentication settings are not configured for the Kafka data source, data migration tasks with this data source as the source or target may fail to connect to this data source. Configure security authentication settings as follows:
Click Upload File and upload an SSL certificate suffixed with
.jks.Choose whether to enable SASL based on business needs. If you choose to enable Simple Authentication and Security Layer (SASL), configure the following parameters.
Parameter Description Authentication Method PLAIN, SCRAM-SHA-256, and SCRAM-SHA-512 are supported. - PLAIN authentication is simple and does not support dynamic changes of users. In this authentication method, usernames and passwords are configured in plaintext, resulting in low security.
- Salted Challenge Response Authentication Mechanism (SCRAM) authenticates users by username and password. SCRAM-SHA-256 can be used together with Transport Layer Security (TLS) for security authentication.
In this authentication method, users can be dynamically changed and user data is stored in ZooKeeper. Before a broker is started, a communication user between the broker and ZooKeeper must be created. However, usernames and passwords are configured in plaintext in this authentication method. - SCRAM-SHA-512 can be used together with TLS for security authentication.
Username The username of the user used for data migration. Password The password of the user used for data migration.
- If you select Endpoint as the connection type, you must add the authorized account ID displayed on the page to the allowlist of your endpoint service. This way, the endpoint of this account can connect to your endpoint service. For more information, see Connect via private network.
Click Test and Continue.
In the Select Type & Objects step, configure the following parameters.
In the Migration Type section, select the migration types for the migration task.
Migration type Description Full migration After the full migration task begins, the data migration service will transfer the existing data from the source database tables to the corresponding tables in the target database. Incremental synchronization After the incremental synchronization task begins, the data migration service will synchronize the changes (inserts, updates, or deletes) from the source database to the corresponding tables in the target database. Incremental Synchronization includes DML Synchronization and DDL Synchronization. You can select based on your needs. For more information on synchronizing DDL, see Custom DML/DDL configurations. In the Select Migration Objects section, select a method for specifying migration objects.
You can select Specify Objects or Match by Rule.
In the Select Migration Scope section, select migration objects.
If you select Specify Objects, the data migration service supports the Database Object and Entire Database options. Database Object allows you to select tables and views from one or more databases as the migration objects. Entire Database allows you to select an entire database for the migration. If you select the Database Object option to migrate specific objects in a database, you are not able to migrate the entire database. Vice versa, if you select the Entire Database option to migrate an entire database, you are not able to select objects from the database for the migration.
When you migrate data from an OceanBase database to a Kafka instance, you can migrate data from multiple tables to multiple topics.
In the left-side pane, select the objects to be migrated.
Click >.
Click the Existing Topics drop-down list in the Map Objects to Topic dialog box and select the target topic.
Click OK.
The data migration service allows you to import objects by using text, change topics and set row filters for target objects, as well as remove a single object or all objects.
Note
Take note of the following considerations if you select Entire Database:
The right-side pane displays only the database name and does not list all objects in the database.
If you have selected Incremental Synchronization-DDL Synchronization, newly created tables in the source database can also be synchronized to the target database.
Operation Description Import objects In the list on the right, click Import in the upper-right corner. For more information, see Import migration objects. Change topics The data migration service allows you to change the topic for target objects. For more information, see Change a topic. Configure settings The data migration service allows you to use the WHEREclause to set row filters and select sharding columns and columns to synchronize. In the Settings dialog box, you can perform the following operations:- In the Row Filters section, specify a
WHEREcondition for row-based filtering. For more information, see Use SQL conditions to filter data. - Select the sharding columns that you want to use from the Sharding Columns drop-down list. You can select multiple fields as sharding columns. This parameter is optional.
Unless otherwise specified, select the primary key as sharding columns. If the primary key is not load-balanced, select load-balanced columns with unique identifiers as sharding columns to avoid potential performance issues. Sharding columns can be used for the following purposes:- Load balancing: Threads used for sending messages can be recognized based on the sharding columns if the target table supports concurrent writes.
- Orderliness: The data migration service ensures that messages are received in order if the values of the sharding columns are the same. The orderliness specifies the sequence of executing DML statements for a column.
- In the Select Columns section, select the columns to be synchronized. For more information, see Column filtering.
Remove one or all objects You can remove one or all migration objects during data mapping. - Remove a single migration object
In the list on the right, click the Remove icon next to the target object. - Remove all migration objects
In the list on the right, click Clear All in the upper-right corner. In the dialog box that appears, click OK.
If you select Match by Rule, see Configure database-to-database matching rules for more information.
Click Next. On the Migration Options page, configure the following parameters.
Full migration
The following parameters are displayed only if you have selected Full Migration in the Select Type & Objects step.
Parameter Description Read Concurrency The concurrency for reading data from the source during full migration. The maximum value is 512. A high read concurrency may incur excessive stress on the source, affecting the business. Write Concurrency The concurrency for writing data to the target during full migration. The maximum value is 512. A high write concurrency may incur excessive stress on the target, affecting the business. Rate Limiting for Full Migration You can choose whether to limit the full migration rate as needed. If you choose to limit the full migration rate, you must specify the records per second (RPS) and bytes per second (BPS). The RPS specifies the maximum number of data rows migrated to the target database per second during full migration, and the BPS specifies the maximum amount of data in bytes migrated to the target database per second during full migration. Note
The RPS and BPS values specified here are only for throttling. The actual full migration performance is subject to factors such as the settings of the source and target databases and the instance specifications.
Incremental synchronization
The following parameters are displayed only if you have selected Incremental Synchronization in the Select Type & Objects step.
Parameter Description Write Concurrency The concurrency for writing data to the target during incremental synchronization. The maximum value is 512. A high write concurrency may incur excessive stress on the target, affecting the business. Rate Limiting for Incremental Migration You can choose whether to limit the incremental synchronization rate as needed. If you choose to limit the incremental synchronization rate, you must specify the RPS and BPS. The RPS specifies the maximum number of data rows synchronized to the target database per second during incremental synchronization, and the BPS specifies the maximum amount of data in bytes synchronized to the target database per second during incremental synchronization. Note
The RPS and BPS values specified here are only for throttling. The actual incremental synchronization performance is subject to factors such as the settings of the source and target databases and the instance specifications.
Incremental Synchronization Start Timestamp - If you have selected Full Migration as the migration type, this parameter is not displayed.
- If you have selected Incremental Synchronization but not Full Migration, specify the timestamp after which the data is to be synchronized. The default value is the current system time. For more information, see Set an incremental synchronization timestamp.
Advanced options
Parameter Description Serialization Method Controls the message format for migrating data to the Kafka instance. Valid values: Default, Canal, DataWorks (version 2.0 supported), SharePlex, DefaultExtendColumnType, Debezium, DebeziumFlatten, DebeziumSmt, and Avro. For more information, see Data formats. Note
- At present, only the MySQL compatible mode of OceanBase Database supports Debezium, DebeziumFlatten, DebeziumSmt, and Avro.
- If the parameter is set to DataWorks, DDL operations
COMMENT ON TABLEandALTER TABLE…TRUNCATE PARTITIONcannot be synchronized.
Partitioning Rule The rule for synchronizing data from an OceanBase database to a Kafka topic. Valid values: Hash, Table, and One. For more information about the delivery of DDL statements in different scenarios and examples, see the description below. - Hash indicates that the data migration service uses a hash algorithm to select the partition of a Kafka topic based on the hash value of the primary key or sharding column.
- Table indicates that the data migration service delivers all data in a table to the same partition and uses the table name as the hash key.
- One indicates that JSON messages are delivered to a partition under a topic to ensure ordering.
Business System Identification (Optional) The identifier that identifies the source business system of data. This parameter is displayed only when Serialization Method is set to DataWorks. The business system identifier consists of 1 to 20 characters. The following table describes the delivery of a DDL statement in different scenarios.
Partitioning rule When the DDL statement involves multiple tables (example: RENAME TABLE)When the DDL statement involves unknown tables (example: DROP INDEX)When the DDL statement involves a single table Hash The DDL statement is delivered to all partitions of the topics associated with the involved tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to all partitions of Topic 1 and Topic 2.The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by Migration. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.The DDL statement is delivered to all partitions of the topic associated with the table. Table The DDL statement is delivered to specific partitions of the topics associated with the tables. The partitions correspond to the hash values of the names of involved tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to the partitions corresponding to the hash values of the involved table names in Topic 1 and Topic 2.The DDL statement is delivered to all partitions of all topics of the current task.
Assume that the DDL statement cannot be identified by Migration. If the current task has three topics, the DDL statement is delivered to all partitions of these three topics.The DDL statement is delivered to a partition of the topic associated with the table. One The DDL statement is delivered to a fixed partition of the topics associated with the tables.
Assume that the DDL statement involves three tables: A, B, and C. If A is associated with Topic 1, B is associated with Topic 2, and C is not involved in the current task, the DDL statement is delivered to a fixed partition of Topic 1 and Topic 2.The DDL statement is delivered to a fixed partition of all topics of the current task.
Assume that the DDL statement cannot be identified by Migration. If the current task has three topics, the DDL statement is delivered to a fixed partition of these three topics.The DDL statement is delivered to a fixed partition of the topic associated with the table.
Click Next. Then, the system performs a precheck on the data migration task.
During the precheck, the data migration service detects the connection with the target Kafka instance. If an error is returned during the precheck, you can perform the following operations:
Identify and troubleshoot the problem and then perform the precheck again.
Click Skip in the Actions column of a failed precheck item. In the dialog box that prompts the consequences of the skip, click OK.
After the precheck succeeds, click Purchase to purchase a data migration instance.
Then, start the data migration task. For more information, see Purchase a data migration instance. If you do not need to purchase a data migration instance now, click Save to go to the details page of the data migration task. You can purchase a data migration instance later as needed.
The data migration service allows you to modify the migration objects when the task is running. For more information, see View and modify migration objects. After the data migration task is started, it is executed based on the selected migration types. For more information, see the "View migration details" section in View details of a data migration task.