DataX is an open-source version of DataWorks developed by Alibaba Cloud and is an offline data synchronization tool widely used in Alibaba Group. DataX efficiently synchronizes data between heterogeneous data sources such as MySQL, Oracle, SQL Server, PostgreSQL, Hadoop Distributed File System (HDFS), Hive, ADS, HBase, Table Store (OTS), MaxCompute (formerly known as ODPS), Hologres, Distributed Relational Database Service (DRDS), and OceanBase Database.
If you use OceanBase Database Enterprise Edition, you can obtain the RPM package of the internal version of DataX from OceanBase Technical Support. If you use OceanBase Database Community Edition, you can download the source code from the DataX open-source website and compile the code. During compilation, you can delete database plug-ins that you do not need from the pom.xml file to control the size of the compiled package.
Framework design

DataX is an offline data synchronization framework that is designed based on the framework + plug-in architecture. Data source reads and writes are abstracted as the reader and writer plug-ins and are integrated into the entire framework.
The reader plug-in is a data collection module that collects data from a data source and sends the data to the framework.
The writer plug-in is a data write module that retrieves data from the framework and writes the data to the destination.
The framework builds a data transmission channel to connect the reader and the writer and processes core technical issues such as caching, throttling, concurrency control, and data conversion.
DataX migrates data in the form of tasks. Each task processes only one table and has a configuration file in the JSON format. The configuration file contains two parts: reader and writer. The reader and writer parts respectively correspond to the database read and write plugins supported by DataX. For example, when you migrate table data from a MySQL database to OceanBase Database, the data must be read from the MySQL database and then written to OceanBase Database. In this case, the txtfilereader plug-in of the MySQL database and the oceanbasev10writer plug-in of OceanBase Database are used. The following sections describe the txtfilereader and oceanbasev10writer plugins.
txtfilereader
The txtfilereader plug-in reads data from the local file system. In underlying implementation, txtfilereader obtains data from local files, converts the data into data that complies with the DataX transmission protocol, and then passes the data to the writer.
For more information about the features and parameters, see txtfilereader plug-in.
oceanbasev10writer
The oceanbasev10writer plug-in writes data to the destination table in OceanBase Database. In underlying implementation, oceanbasev10writer uses a Java client, such as MySQL Connector/J or OceanBase Client (OBClient), to remotely connect to OceanBase Database by using an OBProxy, and executes the corresponding INSERT statement to write data to OceanBase Database. OceanBase Database then commits data in batches.
oceanbasev10writer uses the DataX framework to obtain the protocol data generated by the reader and generates corresponding INSERT statements. If a primary key or unique key conflict occurs during data writing, you can use the replace mode to update all fields in the table for a MySQL tenant in OceanBase Database. You can use only the INSERT method for an Oracle tenant in OceanBase Database. For performance reasons, data is written in batches. A write request is initiated only when the number of rows to be written reaches a predefined threshold.
DataX configuration file
Example:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello, DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
Notice
DataX migrates only table data. You need to create the corresponding table schema in the destination database in advance.
Put the JSON configuration file in the job directory of DataX or in a custom path. Sample command:
$bin/datax.py job/stream2stream.json
Output:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
Task start time : 2021-08-26 11:05:59
Task end time : 2021-08-26 11:06:09
Time consumption : 10s
Average task traffic : 38B/s
Record writing speed : 2rec/s
Total number of read records : 20
Total read and write failures : 0
After a DataX task is executed, a simple task report is returned, providing information such as the average output traffic, write speed, and total number of read and write failures.
You can use the setting parameter in the job directory of DataX to specify the speed parameter and the maximum number of erroneous records allowed.
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
Parameters:
errorLimitspecifies the maximum number of erroneous records allowed for the task. If this limit is exceeded, the task is interrupted and exits.channelspecifies the concurrency. Theoretically, the migration performance increases with the concurrency. However, in practice, you need to consider the read pressure on the source side, network transmission performance, and write performance on the destination side.
Prepare the environment
Download the .tar package from http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz.
Decompress the package:
tar zxvf datax.tar.gz
cd datax
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
Directories in the installation file:
| Directory | Description |
|---|---|
| bin/ | The directory of the executable files. The datax.py file in this directory is the startup script of DataX tasks. |
| conf | The log file configuration directory. This directory stores DataX configuration files that are not related to tasks. |
| lib | The directory of runtime dependent packages. This directory stores the global .jar files for running DataX. |
| job | This directory contains a task configuration file for testing and verifying the installation of DataX. |
| log | The log directory. This directory stores the running logs of DataX tasks. During DataX runtime, logs are output to the standard output and written to the log directory by default. |
| plugin | The plug-in directory. This directory stores various data source plug-ins supported by DataX. |
Examples
Copy, to the destination DataX server, the CSV files exported from the source, and then import the CSV files to the destination OceanBase database.
The myjob.json configuration file is as follows:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/tpcc/bmsql_oorder"],
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": [
"truncate table bmsql_oorder"
],
"connection": [
{
"jdbcUrl": "jdbc:oceanbase://127.0.0.1:2883/tpcc?",
"table": [
"bmsql_oorder"
]
}
],
"username": "tpcc",
"password":"********",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
| Parameter | Description |
|---|---|
| name | The name of the database plug-in corresponding to the reader or writer that connects to the database. The reader plug-in of MySQL Database is mysqlreader, and the writer plug-in of OceanBase Database is oceanbasev10writer. For more information about the reader and writer plug-ins, see DataX Data Source Guide. |
| jdbcUrl | The JDBC URL of the database to which you want to connect. The value is a JSON array and multiple URLs can be entered for a database. You need to enter at least one JDBC URL in the JSON array. The value must be entered in compliance with the MySQL official format. You can also specify a configuration property in the URL. For more information, see Configuration Properties in the MySQL documentation. Notice [] to enclose the connection string of the JDBC URL of the writer. However, you must use [] to enclose the connection string of the JDBC URL of the reader. |
| username | The username for logging on to the database. |
| password | The password of the specified username required to log on to the database. |
| table | The table to be synchronized. The value is a JSON array and multiple tables can be specified at the same time. When you specify multiple tables, make sure that they use the same schema structure. The MySQL Reader does not verify whether the specified tables belong to the same logic table. Notice The table string must be included in the connection section of the code. |
| column | The set of names of columns to be synchronized in the configured table. The values are specified in a JSON array. We recommend that you do not set columns to ['*'], because the configuration changes with the table schema. We recommend that you specify the column names instead. Column pruning is supported. You can export only the specified columns. Column reordering is supported. You can export columns without following the column order in the table schema. You can specify constants in the MySQL SQL format: ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"]. Note
|
| where | The filter condition. The MySQL reader assembles the specified column, table, and WHERE clause into an SQL statement. Then, the MySQL reader extracts data based on this SQL statement. To synchronize data of the current day, you can specify the WHERE clause asgmt_create > $bizdate.Notice You cannot set the WHERE clause to limit 10, because limit is not a valid WHERE clause of an SQL statement. A WHERE clause allows you to orderly synchronize the incremental business data. If you do not specify the WHERE clause or do not specify the key or value of the WHERE clause, DataX performs full synchronization. |
After you configure the job file, execute the job.
python datax.py ../job/myjob.json
More information
For more information about DataX, see DataX.