Apache Airflow is an open-source platform for developing, scheduling, and monitoring batch processing workflows. All workflows in Airflow can be defined using Python code. The web interface allows you to manage workflow statuses.
Prerequisites
- Apache Airflow is installed. For more information, see the Apache Airflow official website.
- OceanBase Database is installed and a MySQL-compatible tenant is created.
- Ensure that the IP address of the OBServer node to be connected is network-reachable with the host where Airflow is installed.
Obtain the database connection string
Contact the deployment personnel or administrator of OceanBase Database to obtain the database connection string. For example:
obclient -h$host -P$port -u$user_name -p$password -D$database_name
Parameter description:
$host: the IP address for connecting to OceanBase Database. For connection through OceanBase Database Proxy (ODP), use the IP address of an ODP. For direct connection, use the IP address of the OBServer node.$port: the port for connecting to OceanBase Database. For connection through ODP, the default value is2883, which can be customized when ODP is deployed. For direct connection, the default value is2881, which can be customized when OceanBase Database is deployed.$database_name: the name of the database to be accessed.$user_name: the tenant account. For connection through ODP, the format can beusername@tenant name#cluster nameorcluster name:tenant name:username. For direct connection, the format isusername@tenant name.$password: the password of the account.
Notice
The user for connecting to the tenant must have the CREATE, INSERT, DROP, and SELECT privileges on the database.
Here is an example:
obclient -hxxx.xxx.xxx.xxx -P2881 -utest_user001@mysql001 -p****** -Dtest
Add an OceanBase data source in Airflow
Open the Airflow web UI.
Navigate to Admin > Connections.
Click "+" to add a new connection.
Specify the following fields:
Field Description Connection ID ob (can be any identifier). Connection Type MySQL Host The IP address for connecting to OceanBase Database, for example, xxx.xxx.xxx.xxx. This field can be obtained from the-hparameter in the connection string.Schema The name of the database to be accessed, obtained from the -Dparameter in the connection string.Login The account name, for example, test_user001@mysql001, obtained from the-uparameter in the connection string.Password The account password, obtained from the -pparameter in the connection string.Port The port for connecting to OceanBase Database, obtained from the -Pparameter in the connection string. For direct connection, the default value is2881. For connection through ODP, the default value is2883.After the connection is created, you can reference the
obconnection ID in Airflow tasks to access OceanBase Database.
Airflow task example
After you add OceanBase Database to Airflow, you can use the following steps to have Airflow read data from OceanBase Database and print the results.
Create a query.py file in the dags folder under the installation directory of Airflow, and edit the following content.
from airflow import DAG from airflow.utils.dates import days_ago from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'retries': 0, } def fetch_and_print_data(): hook = MySqlHook(mysql_conn_id='ob') sql = "SELECT * FROM person LIMIT 1;" connection = hook.get_conn() cursor = connection.cursor() cursor.execute(sql) rows = cursor.fetchall() for row in rows: print(row) with DAG( dag_id='sql_query', default_args=default_args, schedule_interval='@daily', start_date=days_ago(1), catchup=False, ) as dag: run_and_print = PythonOperator( task_id='run_and_print', python_callable=fetch_and_print_data, ) run_and_printRun the
airflow tasks test sql_query run_and_printcommand to print the first row of the person table.