Scheduling Python script in Airflow
Note: This post is over 6 years old. The information may be outdated.
To schedule a Python script or Python function in Airflow, we use PythonOperator.
Note: For Airflow 2.0+, consider using the TaskFlow API for a more modern and cleaner approach to writing Python tasks.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_python_function():
# your code goes here
print('Hello')
def my_python_function_with_context(**context):
# For more detail about "context" object,
# please refer to https://blog.duyet.net/2019/08/airflow-context.html
ds = context['ds']
print(f'Dag run at {ds}')
dag = DAG('dag_id')
PythonOperator(dag=dag,
task_id='my_python_function',
python_callable=my_python_function)
PythonOperator(dag=dag,
task_id='my_python_function_with_context',
provide_context=True,
python_callable=my_python_function_with_context)
Passing in arguments
Use the op_args and op_kwargs arguments to pass additional arguments to the Python callable.
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def my_python_function(ds, lucky_number, **kwargs):
print(f'Dag run at {ds}')
print(f'Random number is {lucky_number}')
dag = DAG('dag_id')
run_this = PythonOperator(dag=dag,
task_id='my_python_function',
provide_context=True,
python_callable=my_python_function,
op_kwargs={
'lucky_number': 99
})
References
Related Posts
Airflow - "context" dictionary
Biến `context` trong airflow là biến hay sử dụng trong Airflow (`PythonOperator` with a callable function), nhưng mình rất hay quên, note lại đây để dễ dàng tra cứu.
Airflow 2.0 - Taskflow API
Chú trọng vào việc đơn giản hóa và rõ ràng cách viết Airflow DAG, cách trao đổi thông tin giữa các tasks, Airflow 2.0 ra mắt Taskflow API cho phép viết đơn giản và gọn gàng hơn so với cách truyền thống, đặc biệt vào các pipelines sử dụng PythonOperators.
Airflow DAG Serialization
In order to make Airflow Webserver stateless, Airflow >=1.10.7 supports DAG Serialization and DB Persistence. Note - This guide is for Airflow 1.x; for Airflow 2.x, DAG serialization is enabled by default.
Airflow - một số ghi chép
Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.