Airflow - một số ghi chép
Note: This post is over 7 years old. The information may be outdated.
Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.
Note (2025): Bài viết này viết cho Airflow 1.x. Một số API và best practices có thể đã thay đổi trong Airflow 2.x, nhưng các khái niệm cơ bản vẫn áp dụng được.
-
Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless).
- Tránh sử dụng global variables, random values, hardware timers.
-
Một số tính năng nên biết
depends_on_pastsử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.LatestOnlyOperatorđể skip một số bước phía sau nếu một số task bị trễ.BranchPythonOperatorcho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
-
Sử dụng
airflow test <dag-id> <task-id> ...để test task instance trên local khi code. -
Sử dụng Docker Compose để thiết lập môi trường local cho dễ.
-
Để test DAG với scheduler, hãy set
schedule_interval=@once, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnhairflow clear -
Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi
start_datehoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi têndag_idnếu muốn thay đổistart_datehoặc interval. -
Sử dụng Bitshift thay vì
set_upstream()andset_downstream()để code dễ nhìn hơn, ví dụop1 >> op2 # tương đương: op1.set_downstream(op2) op1 >> op2 >> op3 << op4 # tương đương: # op1.set_downstream(op2) # op2.set_downstream(op3) # op3.set_upstream(op4) op1 >> [op2, op3] >> op4 # tương đương # op1 >> op2 # op1 >> op3 # op2 >> op4 # op3 >> op4 # hoặc tương đương # op1.set_downstream([op2, op3]) # op2.set_downstream(op4) # op3.set_downstream(op4) -
Sử dụng
Variablesđể lưu trữ params của DAGs (Admin->Variables)from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True) baz = Variable.get("baz", default_var=None)hoặc sử dụng variable trong jinja template:
echo {{ var.value.<variable_name> }} -
Sử dụng default arguments để tránh lặp lại các tham số
default_args = { 'owner': 'airflow', 'depends_on_past': False, 'params': { 'foo': 'baz' } } with DAG(dag_id='airflow', default_args=default_args): op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1') op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2') op1 >> op2 -
Lưu password, token trong
Connectionsfrom airflow.hooks.base_hook import BaseHook aws_token = BaseHook.get_connection('aws_token').password -
Có thể generate DAG một cách tự động, ví dụ
def create_dag(id): dag = DAG(f'dag_job_{id}', default_args) op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag) ... return dag for i in range(100): globals()[f'dag_job_{id}'] = create_dag(id)
Related Posts
Cài đặt Apache Airflow với Docker Compose
Trong bài này mình sẽ hướng dẫn cách thiết lập môi trường develop Apache Airflow dưới local bằng Docker Compose.
Gửi Slack Alerts trên Airflow
Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
Airflow - "context" dictionary
Biến `context` trong airflow là biến hay sử dụng trong Airflow (`PythonOperator` with a callable function), nhưng mình rất hay quên, note lại đây để dễ dàng tra cứu.
Airflow 2.0 - Taskflow API
Chú trọng vào việc đơn giản hóa và rõ ràng cách viết Airflow DAG, cách trao đổi thông tin giữa các tasks, Airflow 2.0 ra mắt Taskflow API cho phép viết đơn giản và gọn gàng hơn so với cách truyền thống, đặc biệt vào các pipelines sử dụng PythonOperators.