Gửi Slack Alerts trên Airflow
Note: This post is over 7 years old. The information may be outdated.
Slack là một công cụ khá phổ biến trong các Team, slack giúp tập hợp mọi thông tin về Slack (như Jira alert, ETL pipelines, CI/CD status, deployments, ...) một cách thống nhất và dễ dàng theo dõi. Bài viết này mình hướng dẫn gửi mọi báo lỗi của Airflow đến Slack.
Note (2025): Bài viết này viết cho Airflow 1.x. Trong Airflow 2.x, import path đã thay đổi:
airflow.contrib.operators.slack_webhook_operator→airflow.providers.slack.operators.slack_webhook. Khái niệm và cách setup vẫn tương tự.
1. Slack Incoming Webhooks và Airflow Connection
Truy cập Slack App Directory tìm Incoming Webhooks: https://<workspace>.slack.com/apps/A0F7XDUAZ-incoming-webhooks
Ở mục Post to Channel chọn Channel, sau đó bấm Add Incoming Webhooks integration
Sau đó bạn sẽ nhận được 1 URL có dạng: https://hooks.slack.com/services/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2
Vào Airflow > Admin > Connections để thêm một connection mới
- Conn Id:
Slack - Conn Type:
HTTP - Host:
https://hooks.slack.com/services - Password:
/T00000000/B0000000/hssA66nupi72KAFy9ttv5fr2
2. Slack alert Utils
Tạo file utils chứa function alert, ví dụ: /dags/utils/slack_alert.py
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
Task Failed.
*DAG*: {dag_id}
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
dag_id=context.get('dag').dag_id,
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_alert',
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)
3. Config Slack alert cho từng DAG
Với mỗi DAG muốn alert, ta thêm thuộc tính on_failure_callback cho mỗi DAG. Ví dụ như dưới dây:
example_dag.py
from airflow import DAG
...
from utils.slack_alert import task_fail_slack_alert
default_args = {
**params['default_args'],
'owner': DAG_OWNER,
'on_failure_callback': task_fail_slack_alert,
...
}
dag = DAG('dag_id', default_args=default_args)
...
Kết quả:
Tham khảo
- https://medium.com/datareply/integrating-slack-alerts-in-airflow-c9dcd155105
- airflow.operators.slack_operator: https://airflow.apache.org/_modules/airflow/operators/slack_operator.html
Chúc các bạn thành công.
Related Posts
Airflow - một số ghi chép
Một số ghi chép, tips & tricks của mình trong quá trình sử dụng Apache Airflow.
Cài đặt Apache Airflow với Docker Compose
Trong bài này mình sẽ hướng dẫn cách thiết lập môi trường develop Apache Airflow dưới local bằng Docker Compose.
Airflow - "context" dictionary
Biến `context` trong airflow là biến hay sử dụng trong Airflow (`PythonOperator` with a callable function), nhưng mình rất hay quên, note lại đây để dễ dàng tra cứu.
Airflow 2.0 - Taskflow API
Chú trọng vào việc đơn giản hóa và rõ ràng cách viết Airflow DAG, cách trao đổi thông tin giữa các tasks, Airflow 2.0 ra mắt Taskflow API cho phép viết đơn giản và gọn gàng hơn so với cách truyền thống, đặc biệt vào các pipelines sử dụng PythonOperators.