Apache Airflow is an open-source platform designed for developing, scheduling, and monitoring batch-oriented workflows. All workflows in Airflow can be defined using Python code, and the web interface allows for managing the status of these workflows.
Prerequisites
You have configured the necessary roles for your login account in the required monitoring projects (Project Admin, Instance Admin, Data Read/Write). For account permission details, refer to Manage members.
You have installed Apache Airflow. For detailed instructions, refer to the Apache Airflow official website.
Step 1: Obtain an OceanBase Cloud connection string
Log in to the OceanBase Cloud console. On the Instances page, expand the target instance and select Connect > Get Connection String under the target tenant.
In the pop-up window, click Connect with Public IP.
In the Connect with Public IP window, complete the following settings to generate the connection string:
- Under 1. Add an IP address to the allowlist, click Add to add your exit IP address(es) used for the connection to the allowlist.
- (Optional) Under 2. Download the CA certificate to connect securely to the tenant, download the CA certificate and complete the verification.
- Under 3. Connect to your instance, click the drop-down list for Database and Account to create a database and an account for the connection. Select MySQL CLI as the connection method.
Notice
Please keep your password in a secure place after creating your account.
Step 2: Add OceanBase Cloud data source in Airflow
Open the Airflow Web UI.
Navigate to Admin > Connections.
Click the + icon to add a new connection.
Fill in the following fields:
Configuration Item Description Connection Id ob (can be any identifier). Connection Type MySQL Host Taken from the -hparameter in the connection string, which is the hostname of OceanBase Cloud database, for example,t5******.aws-ap-southeast-1.oceanbase.cloud.Schema Taken from the -Dparameter in the connection string, which is the name of the database to be accessed.Login Taken from the -uparameter in the connection string, which is the account name, for example,test.Password Taken from the -pparameter in the connection string, which is the account password.Port Taken from the -Pparameter in the connection string, which is the OceanBase Cloud database connection port.Once created successfully, you can access the OceanBase database in Airflow tasks by referencing the Connection ID
ob.
Example Airflow task
After adding the OceanBase database to Airflow, you can write the following code to read data from the OceanBase database and print it.
In the
dagsfolder of the Airflow installation directory, create a new file namedquery.pyand edit it with the following content:from airflow import DAG from airflow.utils.dates import days_ago from airflow.providers.mysql.hooks.mysql import MySqlHook from airflow.operators.python import PythonOperator default_args = { 'owner': 'airflow', 'retries': 0, } def fetch_and_print_data(): hook = MySqlHook(mysql_conn_id='ob') sql = "SELECT * FROM person LIMIT 1;" connection = hook.get_conn() cursor = connection.cursor() cursor.execute(sql) rows = cursor.fetchall() for row in rows: print(row) with DAG( dag_id='sql_query', default_args=default_args, schedule_interval='@daily', start_date=days_ago(1), catchup=False, ) as dag: run_and_print = PythonOperator( task_id='run_and_print', python_callable=fetch_and_print_data, ) run_and_printRun
airflow tasks test sql_query run_and_printto print the first record from thepersontable.