DataX is the open-source version of DataWorks of Alibaba Cloud. It is an offline data synchronization tool/platform widely used in Alibaba Group. DataX efficiently synchronizes data between heterogeneous data sources such as MySQL, Oracle, SQL Server, PostgreSQL, Hadoop Distributed File System (HDFS), Hive, ADS, HBase, Tablestore (OTS), MaxCompute (formerly known as ODPS), Distributed Relational Database Service (DRDS), and OceanBase Database.
If you use OceanBase Database Enterprise Edition, you can obtain the RPM package of the internal version of DataX from OceanBase Technical Support. If you use OceanBase Database Community Edition, you can download the source code from the DataX page on GitHub and compile the code. During compilation, you can delete database plug-ins that you do not need from the pom.xml file to control the size of the compiled package.
Framework design
DataX is an offline data synchronization framework that is designed based on the framework + plug-in architecture. Data source reads and writes are abstracted as the reader and writer plug-ins and are integrated into the entire framework.
The reader plug-in is a data collection module that collects data from a data source and sends the data to the framework.
The writer plug-in is a data write module that retrieves data from the framework and writes the data to the target.
The framework builds a data transmission channel to connect the reader and the writer and processes core technical issues such as caching, throttling, concurrency control, and data conversion.
DataX migrates data through jobs. Each job processes only one table and has a configuration file in the JSON format. The configuration file contains two parts: reader and writer. reader and writer respectively correspond to the database read and write plug-ins supported by DataX. For example, when you migrate table data from OceanBase Database to an Oracle database, the oceanbasev10reader plug-in of OceanBase Database and the oraclewriter plug-in of the Oracle database are respectively used to read data from OceanBase Database and write the data to the Oracle database. The following sections describe the oceanbasev10reader and oraclewriter plug-ins.
oceanbasev10reader plug-in
oceanbasev10reader uses the DataX framework to obtain the protocol data generated by the reader and then generates an INSERT statement. If a primary key or unique key conflict occurs during data writes, you can update all fields in the table by using the replace mode for a MySQL tenant of OceanBase Database, and by using only the insert mode for an Oracle tenant of OceanBase Database.
The oceanbasev10reader plug-in reads data from OceanBase Database. oceanbasev10reader connects to a remote OceanBase database from a Java client such as MySQL Connector/J or OceanBase Command-Line Client (OBClient) by using OceanBase Database Proxy (ODP), and executes the corresponding SQL statements to select data from the remote OceanBase database.
To be specific, oceanbasev10reader connects to a remote OceanBase database from a Java client such as MySQL Connector/J or OBClient by using ODP, generates a query statement based on the configured information, and sends the query statement to the remote OceanBase database. The remote OceanBase database assembles the execution result of the SQL statement into an abstract dataset by using the custom data types of DataX and passes the dataset to the downstream writer for processing.
oraclewriter plug-in
The oraclewriter plug-in writes data to the target table in the primary Oracle database. To be specific, oraclewriter connects to a remote Oracle database by using JDBC and executes the corresponding INSERT INTO ... statement to write the data to the remote Oracle database. The data is committed in the Oracle database in batches.
oraclewriter uses the DataX framework to obtain the protocol data generated by the reader and generates the corresponding INSERT INTO... statement based on the configured information to insert the data into the target table in the primary Oracle database.
DataX configuration file
Here is a sample configuration file:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "hello, world-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
Notice
DataX migrates only table data. You must create the corresponding table schema in the target database in advance.
Place the JSON configuration file in the job directory of DataX or in a custom path. Here is a sample command:
$bin/datax.py job/stream2stream.json
The output is as follows:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
Job start time: 2021-08-26 11:05:59
Job end time: 2021-08-26 11:06:09
Time consumption: 10s
Average job traffic: 38 B/s
Record writing speed: 2 rec/s
Total number of read records: 20
Total read and write failures: 0
After a DataX job is executed, a simple report is returned, providing information such as the average output traffic, write speed, and total number of read and write failures.
You can define the speed and erroneous record tolerance in the setting section of the configuration file for each DataX job.
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
The parameters are described as follows:
errorLimitspecifies the maximum number of erroneous records allowed for the job. If this limit is exceeded, the job is interrupted and exits.channelspecifies the number of concurrent jobs. Theoretically, a larger number of concurrent jobs means higher migration performance. However, you must consider the read stress of the source database, network transmission performance, and write performance of the target database.
Prepare the environment
Download the .tar package from http://datax-opensource.oss-cn-hangzhou.aliyuncs.com/datax.tar.gz.
Decompress the installation package:
tar zxvf datax.tar.gz
cd datax
The directories are as follows:
$tree -L 1 --filelimit 30
.
├── bin
├── conf
├── job
├── lib
├── log
├── log_perf
├── plugin
├── script
└── tmp
The following table describes some important directories in the installation package.
| Directory | Description |
|---|---|
| bin | The directory where the executable files are located. The datax.py file in this directory is the startup script of DataX jobs. |
| conf | The directory where the log files are located. This directory stores DataX configuration files that are not related to jobs. |
| lib | The directory where the libraries that DataX depends on are located. This directory stores the global .jar files for running DataX. |
| job | This directory contains a job configuration file for testing and verifying the installation of DataX. |
| log | The log directory. This directory stores the running logs of DataX jobs. During DataX runtime, logs are output in the standard output format and written to the log directory by default. |
| plugin | The plug-in directory. This directory stores various data source plug-ins supported by DataX. |
Examples
When you migrate data from OceanBase Database to an Oracle database, if the source and target databases cannot connect to the DataX server at the same time, you can export the data as CSV files and then import the CSV files to the target database. If the source and target databases can connect to the DataX server at the same time, you can use DataX to migrate data directly from the source database to the target database.
Example: Migrate the data of the test.t1 table in OceanBase Database to the test.t1 table in an Oracle database.
Here is a sample myjob.json configuration file:
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "oceanbasev10reader",
"parameter": {
"username": "******",
"password": "******",
"column": ["*"],
"connection": [
{
"table": ["t1"],
"jdbcUrl": ["jdbc:oceanbase://172.30.xxx.xxx:2883/test"]
}
]
}
},
"writer": {
"name": "oraclewriter",
"parameter": {
"obWriteMode": "insert",
"column": ["*"],
"preSql": ["truncate table t1"],
"connection": [
{
"jdbcUrl": "jdbc:oracle:thin:@172.30.xxx.xxx:1521:test",
"table": ["TEST.T1"]
}
],
"username": "******",
"password":"******",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}
The following table describes the parameters.
| Parameter | Description |
|---|---|
| name | The name of the database plug-in corresponding to the reader or writer that connects to the database. The reader plug-in of the MySQL database is mysqlreader, and the writer plug-in of OceanBase Database is oceanbasev10writer. For more information about the reader and writer plug-ins, see DataX. |
| jdbcUrl | The JDBC URL of the database to which you want to connect. The value is a JSON array and multiple URLs can be entered for a database. You must enter at least one JDBC URL in the JSON array. The value must be entered in compliance with the MySQL official format. You can also specify a configuration property in the URL. For more information, see Configuration Properties in MySQL documentation.
Notice
|
| username | The username of the account for logging in to the database. |
| password | The password of the account for logging in to the database. |
| table | The table to be migrated. The value is a JSON array and multiple tables can be specified at the same time. When you specify multiple tables, make sure that they use the same schema structure. mysqlreader does not verify whether the specified tables belong to the same logic table.
NoticeThe
|
| column | The set of names of columns to be migrated in the configured table. The values are specified in a JSON array. We recommend that you do not set this parameter to ['*'], because the configuration changes with the table schema. We recommend that you specify the column names instead. Column pruning is supported. You can export only the specified columns. Column reordering is supported. You can export columns without following the column order in the schema. You can specify constants in the SQL syntax format of MySQL, for example, ["id", "`table`", "1", "'bazhen.csy'", "null", "to_char(a + 1)", "2.3" , "true"].
Note
|
| where | The filter condition. mysqlreader assembles the specified columns, tables, and where clause into an SQL statement. Then, mysqlreader extracts data based on this SQL statement. To synchronize data of the current day, you can set the condition of the where clause to gmt_create > $bizdate.
NoticeYou cannot set the
|
After you configure the job file, execute the job. The syntax of the command is as follows:
python datax.py ../job/myjob.json
More information
For more information about DataX, see DataX.