Scripts2026年4月13日·1 分钟阅读

Apache Airflow — Programmatic Workflow Orchestration Platform

Apache Airflow is the industry-standard platform for authoring, scheduling, and monitoring data workflows. Define DAGs in Python to orchestrate ETL pipelines, ML training, data processing, and any complex workflow with dependencies.

SC
Script Depot · Community
快速使用

先拿来用,再决定要不要深挖

这里应该同时让用户和 Agent 知道第一步该复制什么、安装什么、落到哪里。

# Install Airflow with pip
pip install apache-airflow

# Initialize database and create admin user
airflow db init
airflow users create --username admin --password admin \
  --firstname Admin --lastname User --role Admin --email admin@example.com

# Start the web server and scheduler
airflow webserver --port 8080 &
airflow scheduler &

# Access UI at http://localhost:8080

# Or use Docker Compose (recommended)
curl -LfO https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml
docker compose up -d

Introduction

Apache Airflow is the most widely used workflow orchestration platform. Originally created at Airbnb in 2014, it lets you define complex workflows as Directed Acyclic Graphs (DAGs) in Python. Each task in a DAG can run SQL queries, call APIs, execute Python functions, trigger Spark jobs, or interact with any system.

With over 45,000 GitHub stars and as an Apache Top-Level Project, Airflow is used by thousands of companies including Airbnb, Google, Netflix, Slack, PayPal, and Lyft for data engineering, ML pipelines, and business process automation.

What Airflow Does

Airflow schedules and monitors workflows defined as Python code. A DAG defines tasks and their dependencies. The scheduler determines when tasks should run, the executor distributes work to workers, and the web UI provides visibility into pipeline status, logs, and history.

Architecture Overview

[DAG Files (Python)]
dags/ directory
        |
   [Scheduler]
   Parses DAGs, creates
   task instances, manages
   scheduling and retries
        |
   [Executor]
+-------+-------+-------+
|       |       |       |
[Local]  [Celery] [Kubernetes]
Single   Redis/   K8s pods
process  RabbitMQ per task
         workers
        |
   [Metadata DB]
   PostgreSQL / MySQL
   Task state, history
        |
   [Web UI]
   DAG visualization
   Task logs, Gantt charts
   Trigger, retry, clear

Self-Hosting & Configuration

# dags/etl_pipeline.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta

default_args = {
    "owner": "data-team",
    "retries": 3,
    "retry_delay": timedelta(minutes=5),
}

with DAG(
    dag_id="daily_etl",
    default_args=default_args,
    schedule="@daily",
    start_date=datetime(2024, 1, 1),
    catchup=False,
    tags=["etl", "production"],
) as dag:

    def extract_data(**context):
        import requests
        data = requests.get("https://api.example.com/data").json()
        context["ti"].xcom_push(key="raw_data", value=data)

    def transform_data(**context):
        raw = context["ti"].xcom_pull(key="raw_data")
        transformed = [{"id": r["id"], "value": r["amount"] * 1.1} for r in raw]
        context["ti"].xcom_push(key="transformed", value=transformed)

    extract = PythonOperator(task_id="extract", python_callable=extract_data)
    transform = PythonOperator(task_id="transform", python_callable=transform_data)
    load = PostgresOperator(
        task_id="load",
        postgres_conn_id="my_postgres",
        sql="INSERT INTO results SELECT * FROM staging;",
    )

    extract >> transform >> load

Key Features

  • Python DAGs — define workflows as code with full Python flexibility
  • Rich UI — web dashboard with DAG visualization, logs, and monitoring
  • Scheduling — cron-based and data-aware scheduling
  • Extensibility — 1,000+ provider packages for any integration
  • Backfill — re-run historical task instances for data reprocessing
  • Dynamic DAGs — generate DAGs programmatically based on config
  • XCom — pass data between tasks within a DAG
  • SLA & Alerts — email and Slack alerts on failures or SLA misses

Comparison with Similar Tools

Feature Airflow Prefect Dagster Temporal n8n
Language Python Python Python Any (SDK) Visual/JS
DAG Definition Python files Decorators Assets + ops Workflows Drag-and-drop
UI Good Good Excellent Basic Excellent
Data Awareness Basic Good Excellent No No
Scaling Celery/K8s Agents K8s Workers Workers
Learning Curve Moderate Low Moderate Moderate Very Low
Best For Data Engineering Modern ETL Data Platform Microservices Automation
GitHub Stars 45K 22K 12K 20K 65K

FAQ

Q: Airflow vs Prefect — which should I choose? A: Airflow for mature, battle-tested orchestration with the largest community. Prefect for a more modern, Pythonic API with better local development experience. Airflow has more integrations and enterprise adoption.

Q: Can Airflow handle real-time streaming? A: No. Airflow is designed for batch workflows. For streaming, use Kafka, Flink, or Spark Streaming. Use Airflow to orchestrate batch jobs that process data from streams.

Q: How do I scale Airflow? A: Use CeleryExecutor with Redis/RabbitMQ for horizontal scaling across workers. For Kubernetes environments, use KubernetesExecutor to run each task as a K8s pod.

Q: Is there a managed Airflow service? A: Yes. Google Cloud Composer, AWS MWAA (Managed Workflows for Apache Airflow), and Astronomer provide managed Airflow with auto-scaling and maintenance.

Sources

讨论

登录后参与讨论。
还没有评论,来写第一条吧。

相关资产