DataX is an open-source version of Alibaba Cloud DataWorks. It is an offline data synchronization tool widely used by Alibaba Group. It efficiently synchronizes data between heterogeneous data sources such as MySQL, Oracle, SQL Server, PostgreSQL, Hadoop Distributed File System (HDFS), Hive, ADS, HBase, Table Store (OTS), MaxCompute (formerly known as ODPS), Distributed Relational Database Service (DRDS), and OceanBase Database.
If you use OceanBase Database Enterprise Edition, you can request the internal version of DataX (RPM package) from Technical Support of OceanBase Database. If you use OceanBase Database Community Edition, you can download the source code from the open source website of DataX and then compile the code. During compilation, remove unused database plug-ins from the pom.xml file. Otherwise, the compiled package will be very large.
Framework design

DataX is an offline data synchronization framework that is designed based on the framework + plug-in architecture. Data source reads and writes are abstracted as the reader and writer plug-ins and are integrated into the entire framework.
The reader plug-in is a data collection module that collects data from a data source and sends the data to the framework.
The writer plug-in is a data write module that retrieves data from the framework and writes the data to the destination.
The framework builds a data transmission channel to connect the reader and the writer and processes core technical issues such as caching, throttling, concurrency control, and data conversion.
DataX migrates data through tasks. Each task migrates only one table and has a configuration file in JSON format. The configuration file contains two sections: reader and writer. reader and writer respectively correspond to the database read and write plug-ins supported by DataX. For example, when you migrate table data from an OceanBase database to an Oracle database, the oceanbasev10reader plug-in of OceanBase Database and the oraclewriter plug-in of Oracle Database are respectively used to read data from the OceanBase database and write the data to the Oracle database. The following sections describe the oceanbasev10reader and oraclewriter plug-ins.
oceanbasev10reader plug-in
Oceanbasev10Writer uses the DataX framework to obtain the protocol data generated by the reader and then generates an insert statement. If a primary key or unique key conflict occurs when data is written, you can update all fields in the table by using the replace mode for a MySQL tenant of OceanBase Database, and only the insert mode for an Oracle tenant of OceanBase Database.
The oceanbasev10reader plug-in reads data from OceanBase Database. The oceanbasev10reader plug-in connects to a remote OceanBase database from a Java client (MySQL JDBC or OBClient) by using OceanBase Database Proxy (ODP), and executes the corresponding SQL statements to select data from the OceanBase database.
oceanbasev10reader connects to a remote OceanBase database from a Java client (MySQL JDBC or OBClient) by using ODP, generates a query statement based on the configured information, and sends the query statement to the remote OceanBase database. The remote OceanBase Database assembles the execution result of the SQL statement into an abstract dataset by using the custom data types of DataX and passes the dataset to the downstream writer for processing.
oraclewriter plug-in
The oraclewriter plug-in writes data to the destination table in the primary Oracle database. oraclewriter connects to a remote Oracle database through JDBC and executes the corresponding INSERT INTO statement to write the data to the remote Oracle database. The data is committed to the remote Oracle database in batches.
oraclewriter uses the DataX framework to obtain the protocol data generated by the reader and generates the corresponding INSERT INTO statement based on the configured information to insert the data to the destination table in the primary Oracle database.
For more information about the features and parameters, see oraclewriter plug-in.
DataX configuration file
Example:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello, world-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
Notice
DataX migrates only the data of a table. Therefore, you must create the schema of the table in the destination database in advance.
Place the JSON configuration file in the job directory of DataX or in a custom path. Run the following command:
$bin/datax.py job/stream2stream.json
Output:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
Task start time : 2021-08-26 11:05:59
Task end time : 2021-08-26 11:06:09
Time consumption : 10s
Average task traffic : 38 B/s
Record writing speed : 2rec/s
Total number of read records : 20
Total read and write failures : 0
After DataX executes a task, it generates a simple task report that covers the preceding average traffic, write speed, and total number of read/write failures.
You can specify the speed and error record limit in the job parameter settings of DataX.
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
Parameters:
errorLimit: the limit on the number of error records. When this limit is exceeded, the task is terminated.channel: the concurrency. Technically, a higher concurrency value indicates higher migration performance. In actual operations, you must also consider the read pressure on the source database, network transmission performance, and write performance of the destination database.
Prepare the environment
Download the .tar package from http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz.
Decompress the installation package:
tar zxvf datax.tar.gz
cd datax
The directories are as follows:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
The following table describes the directories in the installation package.
| Directory name | Description |
|---|---|
| bin | The directory where the executable file is located. The datax.py file in this directory is the startup script of DataX tasks. |
| conf | The directory where log files are located. The DataX configuration files unrelated to tasks are stored in this directory. |
| lib | The directory where the libraries required for running are located. The global .jar files required for the running of DataX are stored in this directory. |
| job | The directory where the task configuration file for verifying DataX installation is located. |
| log | The directory where log files are located. The running logs of DataX tasks are stored in this directory. By default, when DataX runs, standard logs are generated and written to the log directory. |
| plugin | The directory where the plug-in files are located. The data source plug-ins supported by DataX are stored in this directory. |
Use DataX to migrate data from OceanBase Database to an Oracle database
When you migrate data from an OceanBase database to an Oracle database, if the source and destination databases cannot concurrently connect to the DataX server, you can export the data as CSV files and then import the files to the destination database. If the source and destination databases can concurrently connect to the DataX server, you can use DataX to directly migrate data from the source to the destination.
Example: Migrate the data of the test.t1 table in an OceanBase database to the test.t1 table in an Oracle database.
Content of the myjob.json configuration file:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"username": "******",
"password": "******",
"column": ["*"],
"connection": [
{
"table": ["t1"],
"jdbcUrl": ["jdbc:oceanbase://172.30.xxx.xxx:2883/test"]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": ["truncate table t1"],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@172.30.xxx.xxx:1521:test",
"table": ["TEST.T1"]
}
],
"username": "******",
"password":"******",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
| Parameter | Description |
|---|---|
| name | The name of the reader or writer database plug-in for connecting to the database. The reader plug-in of MySQL is mysqlreader, and the writer plug-in of OceanBase Database is oceanbasev10writer. For more information about the reader and writer plug-ins, see DataX data source guide. |
| jdbcUrl | The JDBC URL of the database to which you want to connect. The value is a JSON array and multiple URLs can be entered for a database. You need to enter at least one JDBC URL in the JSON array. The value must be entered in compliance with the MySQL official format. You can also specify a configuration property in the URL. For more information, see Configuration Properties in the MySQL documentation. Notice []) but the JDBC URL of the reader must be enclosed with square brackets ([]). |
| username | The username for logging on to the database. |
| password | The password of the specified username required to log on to the database. |
| table | The table to be synchronized. The value is a JSON array and multiple tables can be specified at the same time. When you specify multiple tables, make sure that they use the same schema structure. The MySQL Reader does not verify whether the specified tables belong to the same logic table. Notice The table string must be included in the connection section of the code. |
| column | The set of names of columns to be synchronized in the configured table. The values are specified in a JSON array. We recommend that you do not set the column parameter to ['*'], because this configuration changes when the schema changes. We recommend that you specify specific column names. Column pruning is supported. You can export only the specified columns. Column reordering is supported. You can export columns without following the column order in the table schema. You can specify constants in the MySQL SQL format: ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]. Note
|
| where | The filter condition. The MySQL reader assembles the specified column, table, and WHERE clause into an SQL statement. Then, the MySQL reader extracts data based on this SQL statement. To synchronize data of the current day, you can specify the WHERE clause asgmt_create > $bizdate.Notice You cannot set the WHERE clause to limit 10, because limit is not a valid WHERE clause of an SQL statement. A WHERE clause allows you to orderly synchronize the incremental business data. If you do not specify the WHERE clause or do not specify the key or value of the WHERE clause, DataX performs full synchronization. |
After the job configuration file is configured, execute this job.
python datax.py ../job/myjob.json
More information
For more information about DataX, see DataX.