LogoDuyệtSr. Data Engineer
HomeAboutPhotosInsightsCV

Footer

Logo

Resources

  • Rust Tiếng Việt
  • /archives
  • /series
  • /tags
  • Status

me@duyet.net

  • About
  • LinkedIn
  • Resume
  • Projects

© 2026 duyet.net | Sr. Data Engineer | 2026-02-27

Airflow - một số ghi chép

Note: This post is over 7 years old. The information may be outdated.

Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.

Note (2025): Bài viết này viết cho Airflow 1.x. Một số API và best practices có thể đã thay đổi trong Airflow 2.x, nhưng các khái niệm cơ bản vẫn áp dụng được.

  • Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless).

    • Tránh sử dụng global variables, random values, hardware timers.
  • Một số tính năng nên biết

    • depends_on_past sử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.
    • LatestOnlyOperator để skip một số bước phía sau nếu một số task bị trễ.
    • BranchPythonOperator cho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
  • Sử dụng airflow test <dag-id> <task-id> ... để test task instance trên local khi code.

  • Sử dụng Docker Compose để thiết lập môi trường local cho dễ.

  • Để test DAG với scheduler, hãy set schedule_interval=@once, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnh airflow clear

  • Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi start_date hoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi tên dag_id nếu muốn thay đổi start_date hoặc interval.

  • Sử dụng Bitshift thay vì set_upstream() and set_downstream() để code dễ nhìn hơn, ví dụ

    op1 >> op2
    # tương đương: op1.set_downstream(op2)
    
    op1 >> op2 >> op3 << op4
    # tương đương:
    #    op1.set_downstream(op2)
    #    op2.set_downstream(op3)
    #    op3.set_upstream(op4)
    
    op1 >> [op2, op3] >> op4
    # tương đương
    #    op1 >> op2
    #    op1 >> op3
    #    op2 >> op4
    #    op3 >> op4
    # hoặc tương đương
    #    op1.set_downstream([op2, op3])
    #    op2.set_downstream(op4)
    #    op3.set_downstream(op4)
    
  • Sử dụng Variables để lưu trữ params của DAGs (Admin -> Variables)

    from airflow.models import Variable
    foo = Variable.get("foo")
    bar = Variable.get("bar", deserialize_json=True)
    baz = Variable.get("baz", default_var=None)
    

    hoặc sử dụng variable trong jinja template:

    echo {{ var.value.<variable_name> }}
    
  • Sử dụng Slack để nhận thông báo lỗi

  • Sử dụng default arguments để tránh lặp lại các tham số

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'params': { 'foo': 'baz' }
    }
    
    with DAG(dag_id='airflow', default_args=default_args):
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1')
        op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2')
        op1 >> op2
    
  • Lưu password, token trong Connections

    from airflow.hooks.base_hook import BaseHook
    aws_token = BaseHook.get_connection('aws_token').password
    
  • Có thể generate DAG một cách tự động, ví dụ

    def create_dag(id):
        dag = DAG(f'dag_job_{id}', default_args)
        op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag)
        ...
        return dag
    
    for i in range(100):
        globals()[f'dag_job_{id}'] = create_dag(id)
    
Aug 27, 2019·7 years ago
|Data Engineering|
Apache AirflowDataData Engineering
|Edit|

Related Posts

Cài đặt Apache Airflow với Docker Compose

Trong bài này mình sẽ hướng dẫn cách thiết lập môi trường develop Apache Airflow dưới local bằng Docker Compose.

Aug 26, 2019·7 years ago
Read more

Gửi Slack Alerts trên Airflow

Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.

Aug 20, 2019·7 years ago
Read more

Airflow - "context" dictionary

Biến `context` trong airflow là biến hay sử dụng trong Airflow (`PythonOperator` with a callable function), nhưng mình rất hay quên, note lại đây để dễ dàng tra cứu.

Aug 9, 2019·7 years ago
Read more

Airflow 2.0 - Taskflow API

Chú trọng vào việc đơn giản hóa và rõ ràng cách viết Airflow DAG, cách trao đổi thông tin giữa các tasks, Airflow 2.0 ra mắt Taskflow API cho phép viết đơn giản và gọn gàng hơn so với cách truyền thống, đặc biệt vào các pipelines sử dụng PythonOperators.

Dec 26, 2020·5 years ago
Read more