Airflow Avancé

Maîtriser les Internals d'Apache Airflow : De l'Exécution au Debugging Avancé

Plongez au cœur de la machine Airflow pour comprendre les mécanismes d'exécution, optimiser les performances et déboguer les problèmes complexes en production. Un cours pour les ingénieurs de données qui veulent passer de l'utilisation à la maîtrise complète.

Preparetoi.academy 30 min

1. Architecture Interne et Cycle de Vie d'une Task

Définition

Le cycle de vie d'une task dans Airflow représente l'ensemble des états et transitions qu'une instance de tâche traverse, du moment où elle est créée jusqu'à son achèvement. Ce cycle est géré par le scheduler, l'executor et le metadata database, constituant une orchestration sophistiquée au-delà de ce que le simple code DAG expose.

Analogie

Imaginez une chaîne de montage automobile : chaque voiture (task) passe par différentes stations (états) : inspection (scheduled), préparation (queued), assemblage (running), contrôle qualité (success/failed). Le contremaître (scheduler) vérifie les dépendances, l'équipe d'ouvriers (executor) réalise le travail, et un registre (metadata DB) enregistre chaque étape.

Architecture des États

État Description Transition Possible Durée Typique
no_status État initial avant planification → scheduled 0-5ms
scheduled Dépendances satisfaites, prête → queued ou skipped Jusqu'à 5s
queued En attente d'un worker disponible → running ou failed 100ms-2min
running Exécution en cours sur worker → success/failed/upstream_failed Variable
success Complétée avec succès → (parent task event) 0ms
failed Erreur détectée → retry ou failed 0ms
upstream_failed Dépendance parent échouée → (terminal) 0ms
skipped Branche non exécutée → (terminal) 0ms

Mécanisme Interne d'Exécution

Le scheduler scrute la base de données toutes les 1-5 secondes (configurable via scheduler_heartbeat_sec). Pour chaque DAG, il évalue les dépendances topologiques, calcule les intervalles de temps, et crée des TaskInstance si les conditions sont remplies. L'executor reçoit alors les tâches du pool, les alloue aux workers, et collecte les résultats.

DAG Parse → Serialization → Scheduler Check → Dependency Resolution → 
Executor Queue → Worker Allocation → Task Execution → Result Collection → 
Database UpdateTrigger Downstream

Astuce d'Optimisation

Pour déboguer le cycle de vie, explorez airflow tasks list-dag-runs <dag_id> et la table task_instance en base. Vérifiez notamment queued_dttm, start_date, end_date, et duration pour identifier les goulots.

⚠️ Attention Critique

Le scheduler est single-threaded par défaut. Si vous avez 10 000 DAGs avec 50 tasks chacun, le scheduler ne peut pas scanner tous les DAGs en quelques secondes. Cela crée un backlog scheduling où les tâches restent en scheduled pendant des minutes. Utilisez dag_dir_list_interval et partitionnez les DAGs sur plusieurs schedulers en High Availability.


2. Pool, Queue et Executor : Gestion Avancée des Ressources

Définition

Les pools et queues sont les mécanismes de contrôle de concurrence dans Airflow. Un pool limite le nombre de tasks simultanées par type de ressource (DB, GPU, API), tandis qu'une queue routage les tasks vers des workers spécifiques. L'executor détermine comment les tasks sont distribuées (local, celery, kubernetes, etc.).

Analogie

Pensez à un parc d'attractions : les pools sont les attractions elles-mêmes (chaque montagne russe peut accueillir 30 personnes max), les queues sont les files d'attente (premium vs standard), et l'executor est le système de gestion des files (FastPass vs billetterie traditionnelle).

Architecture Pool-Queue-Executor

Composant Rôle Configuration Impact
Pool Limite concurrent_limit par ressource max_active_tasks Throttling horizontal
Queue Routage des tasks aux workers queue attribute Isolation des workloads
Executor Stratégie d'allocation Type (Celery, K8s, Local) Scalabilité verticale
Priority Weight Ordre d'exécution dans queue priority_weight DAG/Task Scheduling fairness

Cas d'Usage Avancés

Scénario 1 : API Rate-Limited
Une tâche appelle une API avec limite 100 requêtes/minute. Sans pool, 50 tasks lancées simultanément = rejet.

pool="api_calls_pool"
pool_slots=5  # Max 5 requêtes simultanées

Scénario 2 : Ressources GPU
Seulement 2 GPUs disponibles sur le cluster, mais 50 tasks nécessitent GPU.

pool="gpu_pool"
pool_slots=2
queue="gpu_queue"  # Worker spécifique avec GPU

Scénario 3 : DAG Multi-Étapes avec Concurrence Variable
Étape 1 (extraction) : 20 tasks parallèles, Étape 2 (fusion) : 1 task, Étape 3 (ML) : 4 tasks

Dynamique pool allocation par étape via XCom ou Variables

Métriques de Performance Internes

L'executor maintient des statistiques :

  • task_queue_depth : Nombre de tasks en attente
  • worker_utilization : % CPU/RAM utilisés
  • heartbeat_latency : Délai entre worker et scheduler
  • task_duration_percentiles : P50, P95, P99 des durées

Astuce : Debugging Pool Contention

SELECT pool, COUNT(*), SUM(CASE WHEN state='queued' THEN 1 ELSE 0 END) as queued_count
FROM task_instance
WHERE dag_id='my_dag'
GROUP BY pool;

⚠️ Attention : Race Condition en Pool

Plusieurs schedulers peuvent assigner le même pool_slot à deux tasks différentes (edge case dans HA). Utilisez airflow pools import avec versioning strict et monitorez pool_full_events.


3. Le Metadata Database : Schéma, Requêtes et Optimisation

Définition

La métabase de données Airflow (par défaut PostgreSQL/MySQL) stocke tous les états, configurations, logs et exécutions. C'est la source de vérité du système, mais aussi son point faible si elle devient lente. Une single query peut ralentir le scheduler de 50%.

Analogie

La métabase est le cerveau d'Airflow : elle enregistre les souvenirs (states), les plans (DAG definitions), les réflexes (triggers). Si le cerveau est congestionné, tout ralentit, y compris la respiration (scheduler heartbeat).

Schéma Critique et Indexation

Table Lignes (10k DAGs) Requêtes/sec Index Critique
dag_run 300 000+ 50-100 dag_id, execution_date
task_instance 15 000 000+ 200-500 dag_id, task_id, state, (dag_id, state)
task_reschedule 100 000+ 10-20 task_id, execution_date
dag 10 000 5-10 dag_id (PK)
log 50 000 000+ 100-200 dag_id, task_id, execution_date

Queries Coûteuses et Optimisation

Query 1 : Scheduler Parsing DAG

-- LENT : Full table scan
SELECT * FROM task_instance 
WHERE dag_id='my_dag' AND execution_date > NOW() - INTERVAL '7 days';

-- OPTIMISÉ : Index + partition
CREATE INDEX idx_ti_dag_exec ON task_instance(dag_id, execution_date DESC);
SELECT * FROM task_instance 
WHERE dag_id='my_dag' AND execution_date > NOW() - INTERVAL '7 days' 
LIMIT 1000;

Query 2 : Audit des Logs

-- CATASTROPHIQUE : 50M rows scan
SELECT * FROM log WHERE created_at > NOW() - INTERVAL '30 days';

-- BON : Indexation sur created_at + partition by month
-- Puis archiver les anciens logs vers S3

Stratégies de Performance

  1. Partitioning : Diviser task_instance par execution_date (monthly)
  2. Vacuum & Analyze : Exécuter régulièrement (PostgreSQL)
  3. Connection Pooling : SQLAlchemy pool_size=10-20, max_overflow=20
  4. Query Timeouts : Ajouter statement_timeout=30s en PostgreSQL
  5. Archive Old Data : Nettoyer les runs > 1 an dans table d'audit

Astuce : Monitorer la Métabase

-- Nombre de task_instances par état (snapshot)
SELECT state, COUNT(*) FROM task_instance GROUP BY state;

-- Requêtes lentes (PostgreSQL)
SELECT query, calls, mean_time FROM pg_stat_statements 
WHERE query LIKE '%task_instance%' ORDER BY mean_time DESC LIMIT 5;

⚠️ Attention : Deadlock sous Charge

Plusieurs schedulers updatent simultanément task_instance. Si deux transactions modifient le même DAG run concurremment, deadlock = retry indéfini. Utilisez isolation_level = 'READ_COMMITTED' et NOWAIT locks en SQL.


4. Debugging Avancé : Logs, XCom et Instrumentation

Définition

Le debugging en Airflow ne se limite pas à print() : il faut tracer l'exécution à travers scheduler → executor → worker, capturer XCom (échange de données inter-tasks), et analyser les logs structurés pour identifier les anomalies subtiles.

Analogie

Déboguer Airflow ressemble à résoudre un crime en réseau : vous avez des témoins (logs) dispersés sur 10 serveurs, des indices (XCom), et une timeline (DAG execution). Il faut reconstituer l'histoire complète sans point central.

Architecture Logging Multi-Niveaux

Niveau Émetteur Format Stockage Volume
Scheduler Scheduler process JSON/Text Fichier + DB 10MB/jour
Executor Executor process JSON/Text Fichier + DB 50MB/jour
Worker Task execution Structured logs Fichier + S3 1-10GB/jour
API/UI HTTP requests JSON DB (limited) 5MB/jour
Airflow internal Core modules DEBUG/INFO File 100MB/jour

Stratégies de Debugging Avancé

1. Capture de XCom pour Debug

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def task_a():
    return {"model_id": 42, "accuracy": 0.95}

def task_b(ti):
    result = ti.xcom_pull(task_ids='task_a')
    print(f"Reçu XCom: {result}")

with DAG('xcom_dag', start_date=datetime(2024,1,1)) as dag:
    t_a = PythonOperator(task_id='task_a', python_callable=task_a)
    t_b = PythonOperator(task_id='task_b', python_callable=task_b, 
                         provide_context=True)
    t_a >> t_b

2. Inspection des Task Instances en Base

SELECT 
    ti.dag_id, ti.task_id, ti.execution_date, ti.state,
    ti.try_number, ti.start_date, ti.end_date,
    EXTRACT(EPOCH FROM (ti.end_date - ti.start_date)) as duration_sec,
    ti.log_filename
FROM task_instance ti
WHERE ti.dag_id = 'my_dag' 
  AND ti.execution_date = '2024-01-15 00:00:00'
ORDER BY ti.task_id;

3. Log Streaming depuis Worker Distant

# Récupérer logs en temps réel
airflow tasks log my_dag task_a 2024-01-15T00:00:00Z

# Ou accéder directement si logs stockés en S3
aws s3 cat s3://airflow-logs/my_dag/task_a/2024-01-15T00:00:00Z.log | tail -100

4. Instrumentation Custom avec Prometheus

from prometheus_client import Counter, Histogram
import time

task_duration = Histogram('task_duration_seconds', 'Task execution time', 
                          labelnames=['dag_id', 'task_id'])
task_errors = Counter('task_errors_total', 'Task failures', 
                      labelnames=['dag_id', 'task_id'])

def monitored_task():
    with task_duration.labels(dag_id='my_dag', task_id='task_a').time():
        try:
            # Exécution
            pass
        except Exception as e:
            task_errors.labels(dag_id='my_dag', task_id='task_a').inc()
            raise

Astuce : Repérer les Tasks Stuck

from airflow.models import TaskInstance
from datetime import datetime, timedelta

# Tasks en 'running' depuis > 2 heures = stuck
stuck_tasks = session.query(TaskInstance).filter(
    TaskInstance.state == 'running',
    TaskInstance.start_date < datetime.utcnow() - timedelta(hours=2)
).all()

for ti in stuck_tasks:
    print(f"STUCK: {ti.dag_id}.{ti.task_id} depuis {ti.start_date}")

⚠️ Attention : Overflow de Logs

Les logs s'accumulent rapidement (1GB/jour = 365GB/an). Sans rotation/archivage :

  • Performance UI dégradée (-50%)
  • Métabase complétée à 80%
  • Recherche de logs impossible

Utilisez log_retention_days=30 et archivez en S3 via Celery task dédiée.


5. Performance Tuning et Optimisation en Production

Définition

Optimiser Airflow en production signifie ajuster les paramètres du scheduler, executor, pools, et base de données pour atteindre les SLA (Service Level Agreements) : latence de scheduling < 5s, throughput > 1000 tasks/min, reliability > 99.95%.

Analogie

Tuner Airflow, c'est comme affiner un moteur de course : changer l'essence (executor), les pistons (task size), l'échappement (logging), les pneus (worker capacity). Chaque ajustement affecte la performance globale.

Paramètres Critiques à Tuner

Paramètre Défaut Production Impact Trade-off
scheduler_heartbeat_sec 5 1-2 Latence scheduling CPU scheduler +20%
dag_dir_list_interval 300 30-60 Parsing DAGs I/O disque +30%
parallelism 32 128-512 Concurrence globale RAM worker +100%
max_active_runs_per_dag 16 32-64 DAG runs simultanées Métabase stress
dag_concurrency 16 32-64 Tasks par DAG Isolation DAGs ↓
pool_slots (default) 128 256-512 Tasks sans pool Wait time ↓
worker_refresh_batch_size 1 0 (infinite) Recycling workers Memory leaks ↓
task_logs_folder cleanup Manual Automated Disk space Performance -5%

Profiling Scheduler Bottleneck

# Script de profiling du scheduler
import cProfile
import pstats
from airflow.jobs.scheduler_job import SchedulerJob

profiler = cProfile.Profile()
scheduler = SchedulerJob()

profiler.enable()
scheduler.run()
profiler.disable()

stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)  # Top 20 functions

Résultats Typiques :

  • 40% temps : _critical_section_dag_iteration (boucle principale)
  • 25% temps : evaluate_dependencies (DAG dependencies)
  • 20% temps : Requêtes métabase (task_instance SELECT)
  • 15% temps : Serialization/Deserialization DAGs

Checklist d'Optimisation Production

  1. Métabase

    • Indexes sur (dag_id, execution_date) en task_instance
    • Connection pooling SQLAlchemy pool_size=20
    • Partition task_instance par mois
    • Archivage logs > 30 jours
    • VACUUM ANALYZE PostgreSQL hebdo
  2. Scheduler

    • High Availability (2+ schedulers, Redis Distributed Lock)
    • scheduler_heartbeat_sec = 1-2
    • dag_dir_list_interval = 30-60
    • Monitoring: task_queue_depth, scheduling_latency
  3. Executor

    • CeleryExecutor si > 5k tasks/jour
    • KubernetesExecutor si besoin dynamic scaling
    • LocalExecutor max 100 tasks (dev/test only)
  4. Workers

    • Déployer sur multi-nodes (1 worker per machine)
    • worker_refresh_batch_size = 0 (graceful shutdown)
    • Monitoring CPU/RAM par worker (seuil 80%)
  5. DAGs

    • Max 1000 tasks par DAG
    • Max 50 DAGs parsés simultanément
    • Utiliser SubDAGs / Dynamic DAGs pour huge graphs
    • Caching DAG parsing (airflow 2.3+)

Astuce : Benchmarking Comparatif

# Avant optimisation
time airflow dags list
# Output: 15.3s

# Après optimisation (indexes + pool tuning)
time airflow dags list
# Output: 2.1s

# Gain: 87% ✅

⚠️ Attention : Over-Tuning Risks

  • Réduire scheduler_heartbeat_sec à 0.5 = CPU scheduler 100%, starving workers
  • Augmenter parallelism à 2000 = Métabase timeout, scheduler crash
  • Éliminer logs = Impossible debugging en production
  • Partitioning table sans index = Perf identique/pire

Le sweet spot : monitoring continu + ajustements incrementaux (10% à la fois).

Accédez à des centaines d'examens QCM — Découvrir les offres Premium