When you use OceanBase Migration Service (OMS) to migrate or synchronize data, data is replicated from the source table to the target table in two phases: full data replication and incremental data replication. In both phases, data is obtained from the source table and synchronized to the target table. In OMS, the Full-Import component performs full data replication, and the Incr-Sync component performs incremental data replication.
The Full-Import and Incr-Sync components runs in the following mode:
Processes of the Full-Import and Incr-Sync components are classified into three modules: Source, Coordinator, and Sink.
Source extracts data from the source table and sends it to Coordinator.
Coordinator performs extract-transform-load (ETL) operations on data and then sends the processed data to Sink.
Sink writes the data to the target table.
The Full-Import and Incr-Sync components use records as a unified model for data flow. In full data reading, a record represents a row of data in the source table. In incremental data reading, a record represents a row of data change in the source table. When an SQL statement executed on the source table generates N data changes, in other words N rows are affected, Source reads N records.
A record contains the following content:
Schema information, including:
Database name and table name
Column information, including the column name, column type, and precision
Primary key, unique key, and index
Data information, including values of each column
Operation type, such as INSERT, UPDATE, DELETE, or DDL operation
Change event, which is contained only in incremental data
The following sections describe the working mechanism of The Full-Import and Incr-Sync components by answering the following questions:
How does OMS read and write data between different source and target tables?
How does OMS ensure data consistency between the source and target tables during data replication?
How does OMS ensure the efficiency of full data replication and incremental data replication?
Full migration
In both full data replication and incremental data replication, data sent by Coordinator is written to the target table. However, data read varies greatly in the two phases. Therefore, this section describes the full data read, incremental data read, and data write processes in detail.
OMS reads full data in the following four steps:
Determines source database tables from which data is to be migrated based on configuration information, possibly according to some rules.
Queries metadata of the database tables, including their primary keys or non-null unique keys and column information.
Analyzes characteristics of the database tables and select an appropriate method for data sharding.
Reads data by shard and send the data.
Data read by JDBC Source
JDBC Source reads data for full migration in the following three processes:
Metadata query: JDBC Source queries metadata of source database tables from which data is to be synchronized, including the schema, index, and partition information about the tables. For example, it reads statistics tables from the
information_schemadatabase in MySQL.Sharding: Assume that a table has two index columns:
c1andc2. JDBC Source sorts data by index columns to obtain a primary key data sequence[(c1_start, c2_start), (c1_end, c2_end)]. JDBC Source splits the data sequence into smaller ones based on a specific size:(null, (c1_start_1, c2_start_1)],((c1_start_1, c2_start_1), (c1_start_2, c2_start_2)], ...,((c1_start_n, c2_start_n), (c1_end, c2_end)], and(c1_end, c2_end]. Each sequence is a shard.Data query: JDBC Source generates an SQL query statement based on the shard
((c1_start, c2_start), (c1_end, c2_end)], and then generates a RecordBatch to Coordinator of the Full-Import component based on the threshold of memory usage. The process of generating the RecordBatch converts unique data types of the database to unified Java data types.
Data write by JDBC Sink
Full data write is simpler than incremental data write. JDBC Sink writes full and incremental data in the following steps:
RecordBatch reorganization: When JDBC Sink writes data to partitions, it may aggregate messages for the same partition.
Target schema cache : In data migration between heterogeneous databases, JDBC Sink matches the types of record values with the target schema in the
n * mformat, and converts Java data types to the data types that can be written to the database.Data write: The JDBC Sink builds a batch statement by RecordBatch and writes data to the target table as transactions.
Resumable transmission in full migration
After Sink successfully writes data, Coordinator notifies Source, and Source maintains the smallest sharding point based on the write success information. It can be deduced from the concept of sharding that sequences are generated in the same order when the same snapshot is used. Based on the smallest sharding point, the Full-Import component can identify a specific data sequence from which sharding is to start.
Incremental synchronization
OMS relies on the Store component for incremental synchronization. The Store component captures, stores, and sends incremental messages as a source. The Store component consists of two parts:
Store that runs on the server to capture and store logs
Store-Client that interacts with Store In OMS, Store-Client is the primary source of incremental messages at the database level and is defined as StoreSource.
The following sections also describe how to use JDBC Sink to write full data in batches and write DDL changes as well as DML changes such as INSERT, UPDATE, and DELETE.
Data read by StoreSource
StoreSource receives messages from Store and converts them to a Coordinator-adaptable RecordBatch. The conversion involves the following operations:
Organize transactional messages: StoreSource organizes the
beginandcommitchange messages into a RecordBatch and sends it to Coordinator. The RecordBatch also optimizes the process in JDBC Sink.Improve parsing: StoreSource uses concurrent parsing and schema caching to increase the parsing rate and reduce memory usage of the record while retaining the order of changes.
Transaction optimization in JDBC Sink by transaction conflict matrix
Incremental changes are expected to be concurrently written to the database without damaging the original transaction. During incremental changes, a row in the database may be changed multiple times by the INSERT, UPDATE, and DELETE` operations, resulting in multiple change messages in a short time. In this case, changes of the same row must be ordered to avoid data quality issues.
The transaction conflict matrix is used to solve such issues. Messages in the memory of the Incr-Sync component are organized into a chained list with the primary key or unique key indexes as keywords. Primary keys or unique keys may be changed alternately. Therefore, the final chained list may contain crosses in some cases, forming a directed acyclic graph (DAG). In the DAG, a transaction with an indegree of 0 is an executable request without pre-dependencies.
Data write by KafkaSink
JDBC writes data in blocking mode by executing SQL statements. Kafka provides a non-blocking API to return write results based on callbacks. The Sink module of the Incr-Sync component also supports asynchronous implementation. The procedure of writing messages to Kafka is as follows:
Conflict resolution: Incremental changes to be written to Kafka must also be ordered, which is similar to that required in the transaction conflict matrix. Generally, only the order of a single change is retained, and the requirement of transaction writing is ignored. The conflict resolution solution is slightly different. To be specific, HASH bucketing is performed based on primary keys by default, and records with the same hash value are stored to the same bucket and form a new RecordBatch based on the time and quantity for delivery. Only one RecordBatch can be delivered at a time. This is similar to the batch delivery mechanism of KafkaProducer.
Serialization: A written record usually needs to be serialized into the JSON format or other protocol formats before it is delivered to Kafka.
Write: KafkaProducer is called and a callback function is registered to notify Coordinator that the write is successful.
Resumable transmission in incremental migration
When the Store component outputs messages, it also outputs the event time and checkpoint that are associated. Generally, the checkpoint is obtained from logical computing based on the event time. The Incr-Sync component maintains the smallest checkpoint at which messages are written within Coordinator and periodically stores checkpoints. When an exception occurs, the system can restore the write progress based on a stored checkpoint.