Airflow - một số ghi chép
Note: This post is over 7 years old. The information may be outdated.
Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.
- 
    Viết các functions (tasks) luôn cho mọi kết quả giống nhau với các input giống nhau (stateless). - Tránh sử dụng global variables, random values, hardware timers.
 
- 
    Một số tính năng nên biết - depends_on_pastsử dụng khi viết DAGs để chắc chắn mọi task instance trước đó đều success.
- LatestOnlyOperatorđể skip một số bước phía sau nếu một số task bị trễ.
- BranchPythonOperatorcho phép rẽ nhánh workflow tùy vào điều kiện được định nghĩa.
 
- 
    Sử dụng airflow test <dag-id> <task-id> ...để test task instance trên local khi code.
- 
    Sử dụng Docker Compose để thiết lập môi trường local cho dễ. 
- 
    Để test DAG với scheduler, hãy set schedule_interval=@once, chạy thử, để chạy lại thì chỉ cần clear DagRuns trên UI hoặc bằng lệnhairflow clear
- 
    Khi DAG đã được chạy, airflow chứa các task instance trong DB. Nếu bạn thay đổi start_datehoặc interval, scheduler có thể sẽ gặp lỗi. Nên đổi têndag_idnếu muốn thay đổistart_datehoặc interval.
- 
    Sử dụng Bitshift thay vì set_upstream()andset_downstream()để code dễ nhìn hơn, ví dụop1 >> op2 # tương đương: op1.set_downstream(op2) op1 >> op2 >> op3 << op4 # tương đương: # op1.set_downstream(op2) # op2.set_downstream(op3) # op3.set_upstream(op4) op1 >> [op2, op3] >> op4 # tương đương # op1 >> op2 # op1 >> op3 # op2 >> op4 # op3 >> op4 # hoặc tương đương # op1.set_downstream([op2, op3]) # op2.set_downstream(op4) # op3.set_downstream(op4)
- 
    Sử dụng Variablesđể lưu trữ params của DAGs (Admin->Variables)from airflow.models import Variable foo = Variable.get("foo") bar = Variable.get("bar", deserialize_json=True) baz = Variable.get("baz", default_var=None)hoặc sử dụng variable trong jinja template: echo {{ var.value.<variable_name> }}
- 
    Sử dụng default arguments để tránh lặp lại các tham số default_args = { 'owner': 'airflow', 'depends_on_past': False, 'params': { 'foo': 'baz' } } with DAG(dag_id='airflow', default_args=default_args): op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1') op2 = BigQueryOperator(task_id='query_2', sql='SELECT 2') op1 >> op2
- 
    Lưu password, token trong Connectionsfrom airflow.hooks.base_hook import BaseHook aws_token = BaseHook.get_connection('aws_token').password
- 
    Có thể generate DAG một cách tự động, ví dụ def create_dag(id): dag = DAG(f'dag_job_{id}', default_args) op1 = BigQueryOperator(task_id='query_1', sql='SELECT 1', dag=dag) ... return dag for i in range(100): globals()[f'dag_job_{id}'] = create_dag(id)