About DataX
As an open-source version of Alibaba Cloud DataWorks, DataX is an offline data synchronization tool widely used in Alibaba Group. DataX efficiently synchronizes data between heterogeneous data sources such as MySQL, Oracle, SQL Server, PostgreSQL, Hadoop Distributed File System (HDFS), Hive, ADS, HBase, Table Store (formerly known as OTS), MaxCompute (formerly known as ODPS), Hologres, Distributed Relational Database Service (DRDS), and OceanBase Database.
For OceanBase Enterprise Edition, you can obtain the DataX internal version in RPM package from OceanBase Technical Support. For OceanBase Community Edition, you can download the source code from the DataX open source website and compile the tool by yourself. When you compile the configuration file, remove unwanted database plug-ins from pom.xml. Otherwise, the compiled package will be very large.
DataX configuration file
DataX migrates data by using jobs. Each job processes only one table and has a configuration file in the JSON format. The configuration file contains the reader and writer sections. reader and writer are database plug-ins supported by DataX and can be used in any combination.
The latest version of DataX also provides a web-based management interface.
Sample configuration file:
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"sliceRecordCount": 10,
"column": [
{
"type": "long",
"value": "10"
},
{
"type": "string",
"value": "Hello, Hello, World-DataX"
}
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": 2
}
}
}
}
Place the JSON configuration file under the job directory of DataX, or in a custom path. Run the following command to execute the file:
$bin/datax.py job/stream2stream.json
Output information:
<.....>
2021-08-26 11:06:09.217 [job-0] INFO JobContainer - PerfTrace not enable!
2021-08-26 11:06:09.218 [job-0] INFO StandAloneJobContainerCommunicator - Total 20 records, 380 bytes | Speed 38B/s, 2 records/s | Error 0 records, 0 bytes | All Task WaitWriterTime 0.000s | All Task WaitReaderTime 0.000s | Percentage 100.00%
2021-08-26 11:06:09.223 [job-0] INFO JobContainer -
Job start time : 2021-08-26 11:05:59
Job end time : 2021-08-26 11:06:09
Total time consumed by the job : 10s
Average job traffic : 38B/s
Record writing speed : 2rec/s
Total number of records read : 20
Total read and write failures : 0
After a DataX job is executed, a simple job report is returned, which displays the average traffic, write speed, and total number of read and write failures.
In DataX, you can specify the speed and error logging tolerance in setting of a job``.
"setting": {
"speed": {
"channel": 10
},
"errorLimit": {
"record": 10,
"percentage": 0.1
}
}
Parameters:
We do not recommend that you use the
speedparameter to limit the migration speed in bytes, because this parameter has a bug``.errorLimitspecifies the maximum number of error records allowed. If this limit is exceeded, the job is dropped and exited.channelspecifies degree of parallelism (DOP). Theoretically, the migration performance improves as the DOP increases. However, in actual practice, you must consider the read pressure at the source end, network transmission performance, and write performance at the target end.
The following describes the read and write plug-ins for common data sources such as MySQL Database, Oracle Database, CSV files, and OceanBase Database.
TxtFileReader plug-in
The TxtFileReader plug-in reads data from local file systems. In the underlying implementation, TxtFileReader obtains data from a local file, converts the data based on the DataX transport protocol, and passes the converted data to the writer.
The content of the local file is a logical two-dimensional table, for example, text information in the CSV format.
TxtFileReader has some feature restrictions and parameters. For more information, see https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md.
The following is a configuration example of TxtFileReader.
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/tpcc/bmsql_oorder"],
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
}
Parameters
pathspecifies the path of the local file system.fileNamespecifies the prefix of the generated file. The complete file name is very long and contains a random string to avoid duplication.You can specify
*forcolumn, so that all field values are used as strings. Although this setting facilitates operation, it may incur risks. You can use it to test common data types.nullFormatspecifies the records that support null values. The default value isnull, which will cause problems during data writes to an Oracle database. When you export a file, we recommend that you set this parameter to"\\N", which indicates a null sum.fieldDelimiterspecifies the column delimiter for the CSV file. Use the same column delimiter specified for file export. If the exported column content contains a column delimiter, it is enclosed within a pair of double quotation marks ("). You can also use commas (,). However, we recommend that you use a less commonly used character, such as|or^.
MysqlReader plug-in
The MysqlReader plug-in reads data from MySQL. In underlying implementation, MysqlReader connects to a remote MySQL database through Java Database Connectivity (JDBC) and executes the corresponding SQL statements to select data from the MySQL database.
Unlike other relational databases, MysqlReader does not support FetchSize.
In terms of implementation principles, MysqlReader connects to a remote MySQL database through a JDBC connector, generates a query statement based on the information that you configured, and sends it to the remote MySQL database. Then, MysqlReader assembles the returned SQL execution result into an abstract dataset by using the data types defined in DataX, and transfers the abstract dataset to the downstream Writer for processing.
For more information about the features and parameters, see https://github.com/alibaba/DataX/blob/master/mysqlreader/doc/mysqlreader.md.
The following is a configuration example of MysqlReader.
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "tpcc",
"password": "********",
"column": [
"*"
],
"connection": [
{
"table": [
"bmsql_oorder"
],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
}
]
}
}
Parameters
If the primary key of the table is a single-column primary key, such as
id, you can add the configuration"splitPk": "db_id",underparameter. If you add the configuration to the end, you must remove the trailing comma (,).columnspecifies the columns to read. We recommend that you specify specific columns. You can use functions to perform logical conversion on columns. If you use*, always make sure that the columns correspond to the columns written on the destination side.
oceanbasev10writer plug-in
The oceanbasev10writer plug-in writes data to the destination table on the primary OceanBase database. In underlying implementation, oceanbaseV10Writer uses a Java client, which may be an underlying MySQL JDBC or OBClient, to connect to a remote OceanBase database through an OBProxy, and executes an INSERT SQL statement to write data to the OceanBase database. Then, oceanbaseV10Writer submits data to the OceanBase database in batches.
Implementation principle
Oceanbasev10Writer uses the DataX framework to obtain the protocol data generated by Reader and generates INSERT statements. For MySQL tenants, you can use REPLACE to update all fields in the table when primary key or unique key conflicts occur. For Oracle tenants, only INSERT operations are supported. For performance reasons, data is written in batch mode. A write request is initiated only when the number of rows reaches a predefined threshold.
The following is a configuration example of oceanbasev10writer.
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": [
"*"
],
"preSql": [
"truncate table bmsql_oorder"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
"table": [
"bmsql_oorder"
]
}
],
"username": "tpcc",
"password":"********",
"batchSize": 512,
"writerThreadCount":10,
"memstoreThreshold": "0.9"
}
}
Parameters
The settings in the foregoing example are intended for reference only and can be modified. For more information, visit the DataX open source project at https://github.com/alibaba/DataX/tree/master/oceanbasev10writer.
| Parameter | Required | Default value | Description |
|---|---|---|---|
jdbcUrl |
Yes | None | The connection information of the destination database, which includes the endpoints and ports of the OceanBase cluster, tenant, and OBProxy, as well as the database name. If you use a domain name, an error may be reported. We recommend that you use the ip parameter. |
table |
Yes | None | The name of the destination table. You can write data to one or more tables. If you specify multiple tables, ensure that the schemas of all tables are consistent. Generally, the database name is not included in the table name. |
column |
Yes | No | The fields to be written to the destination table. Separate the fields with commas (,), for example, "column": ["id","name","age"]. The column parameter must be specified. |
preSql |
No | None | The standard statement specified here is executed before data is written to the destination table. If the SQL statement contains the name of the table that you want to operate on, use @table. When the SQL statement is executed, the variable is replaced based on the actual table name. For example, if you want to write data to the 100 homogeneous table shards whose names are datax_00, datax01, ..., datax_98, and datax_99 on the destination side, and you want to delete data from the tables before the import of new data, you can use "preSql":\["delete from @table"\]. In this way, data will be deleted from the tables specified by delete from before data is written to each table.
Note |
batchSize |
No | 1000 | The number of records submitted at a time. A proper value of this parameter can greatly reduce the number of network interactions between DataX and OceanBase Database and increase the overall throughput. However, if you set an excessively large value for this parameter, a process out-of-memory (OOM) error may occur on DataX. |
memstoreThreshold |
No | 0 | The MemStore usage of the OceanBase tenant. When this threshold is reached, data import is suspended and then resumes after the memory is released. This prevents tenant memory overflow. |
username |
Yes | None | The username for accessing the oceanbase1.0 database. Do not set this parameter to the OceanBase cluster name or tenant name. |
password |
Yes | None | The password for accessing the oceanbase1.0 database. |
writerThreadCount |
No | 1 | The number of threads for data writing in each channel. |
Export MySQL data to a CSV file
Export MySQL data to a CSV file.
The following example shows the content of the configuration file:
$cat job/bmsql_oorder_mysql2csv.json
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "tpcc",
"password": "********",
"column": [
"*"
],
"connection": [
{
"table": [
"bmsql_oorder"
],
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/tpccdb?useUnicode=true&characterEncoding=utf8"]
}
]
}
},
"writer": {
"name": "txtfilewriter",
"parameter": {
"path": "/tmp/tpcc/bmsql_oorder",
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"writeMode": "truncate",
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fileFormat": "csv" ,
"fieldDelimiter": ","
}
}
}
]
}
}
Import a CSV file to OceanBase Database
Copy, to the DataX server on the destination side, the CSV file exported from the source side, and import the file to the OceanBase database on the destination side.
The following example shows the content of the configuration file:
$cat job/bmsql_oorder_csv2ob.json
{
"job": {
"setting": {
"speed": {
"channel": 4
},
"errorLimit": {
"record": 0,
"percentage": 0.1
}
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": ["/tmp/tpcc/bmsql_oorder"],
"fileName": "bmsql_oorder",
"encoding": "UTF-8",
"column": ["*"],
"dateFormat": "yyyy-MM-dd hh:mm:ss" ,
"nullFormat": "\\N" ,
"fieldDelimiter": ","
}
},
"writer": {
"name": "oceanbasev10writer",
"parameter": {
"obWriteMode": "insert",
"column": [
"*"
],
"preSql": [
"truncate table bmsql_oorder"
],
"connection": [
{
"jdbcUrl": "||_dsc_ob10_dsc_||obdemo:oboracle||_dsc_ob10_dsc_||jdbc:oceanbase://127.0.0.1:2883/tpcc?useLocalSessionState=true&allowBatch=true&allowMultiQueries=true&rewriteBatchedStatements=true",
"table": [
"bmsql_oorder"
]
}
],
"username": "tpcc",
"password":"********",
"writerThreadCount":10,
"batchSize": 1000,
"memstoreThreshold": "0.9"
}
}
}
]
}
}