Maîtriser les Internals d'Apache Airflow : De l'Exécution au Debugging Avancé
Plongez au cœur de la machine Airflow pour comprendre les mécanismes d'exécution, optimiser les performances et déboguer les problèmes complexes en production. Un cours pour les ingénieurs de données qui veulent passer de l'utilisation à la maîtrise complète.
1. Architecture Interne et Cycle de Vie d'une Task
Définition
Le cycle de vie d'une task dans Airflow représente l'ensemble des états et transitions qu'une instance de tâche traverse, du moment où elle est créée jusqu'à son achèvement. Ce cycle est géré par le scheduler, l'executor et le metadata database, constituant une orchestration sophistiquée au-delà de ce que le simple code DAG expose.
Analogie
Imaginez une chaîne de montage automobile : chaque voiture (task) passe par différentes stations (états) : inspection (scheduled), préparation (queued), assemblage (running), contrôle qualité (success/failed). Le contremaître (scheduler) vérifie les dépendances, l'équipe d'ouvriers (executor) réalise le travail, et un registre (metadata DB) enregistre chaque étape.
Architecture des États
| État | Description | Transition Possible | Durée Typique |
|---|---|---|---|
| no_status | État initial avant planification | → scheduled | 0-5ms |
| scheduled | Dépendances satisfaites, prête | → queued ou skipped | Jusqu'à 5s |
| queued | En attente d'un worker disponible | → running ou failed | 100ms-2min |
| running | Exécution en cours sur worker | → success/failed/upstream_failed | Variable |
| success | Complétée avec succès | → (parent task event) | 0ms |
| failed | Erreur détectée | → retry ou failed | 0ms |
| upstream_failed | Dépendance parent échouée | → (terminal) | 0ms |
| skipped | Branche non exécutée | → (terminal) | 0ms |
Mécanisme Interne d'Exécution
Le scheduler scrute la base de données toutes les 1-5 secondes (configurable via scheduler_heartbeat_sec). Pour chaque DAG, il évalue les dépendances topologiques, calcule les intervalles de temps, et crée des TaskInstance si les conditions sont remplies. L'executor reçoit alors les tâches du pool, les alloue aux workers, et collecte les résultats.
DAG Parse → Serialization → Scheduler Check → Dependency Resolution →
Executor Queue → Worker Allocation → Task Execution → Result Collection →
Database Update → Trigger Downstream
Astuce d'Optimisation
Pour déboguer le cycle de vie, explorez airflow tasks list-dag-runs <dag_id> et la table task_instance en base. Vérifiez notamment queued_dttm, start_date, end_date, et duration pour identifier les goulots.
⚠️ Attention Critique
Le scheduler est single-threaded par défaut. Si vous avez 10 000 DAGs avec 50 tasks chacun, le scheduler ne peut pas scanner tous les DAGs en quelques secondes. Cela crée un backlog scheduling où les tâches restent en scheduled pendant des minutes. Utilisez dag_dir_list_interval et partitionnez les DAGs sur plusieurs schedulers en High Availability.
2. Pool, Queue et Executor : Gestion Avancée des Ressources
Définition
Les pools et queues sont les mécanismes de contrôle de concurrence dans Airflow. Un pool limite le nombre de tasks simultanées par type de ressource (DB, GPU, API), tandis qu'une queue routage les tasks vers des workers spécifiques. L'executor détermine comment les tasks sont distribuées (local, celery, kubernetes, etc.).
Analogie
Pensez à un parc d'attractions : les pools sont les attractions elles-mêmes (chaque montagne russe peut accueillir 30 personnes max), les queues sont les files d'attente (premium vs standard), et l'executor est le système de gestion des files (FastPass vs billetterie traditionnelle).
Architecture Pool-Queue-Executor
| Composant | Rôle | Configuration | Impact |
|---|---|---|---|
| Pool | Limite concurrent_limit par ressource | max_active_tasks |
Throttling horizontal |
| Queue | Routage des tasks aux workers | queue attribute |
Isolation des workloads |
| Executor | Stratégie d'allocation | Type (Celery, K8s, Local) | Scalabilité verticale |
| Priority Weight | Ordre d'exécution dans queue | priority_weight DAG/Task |
Scheduling fairness |
Cas d'Usage Avancés
Scénario 1 : API Rate-Limited
Une tâche appelle une API avec limite 100 requêtes/minute. Sans pool, 50 tasks lancées simultanément = rejet.
pool="api_calls_pool"
pool_slots=5 # Max 5 requêtes simultanées
Scénario 2 : Ressources GPU
Seulement 2 GPUs disponibles sur le cluster, mais 50 tasks nécessitent GPU.
pool="gpu_pool"
pool_slots=2
queue="gpu_queue" # Worker spécifique avec GPU
Scénario 3 : DAG Multi-Étapes avec Concurrence Variable
Étape 1 (extraction) : 20 tasks parallèles, Étape 2 (fusion) : 1 task, Étape 3 (ML) : 4 tasks
Dynamique pool allocation par étape via XCom ou Variables
Métriques de Performance Internes
L'executor maintient des statistiques :
task_queue_depth: Nombre de tasks en attenteworker_utilization: % CPU/RAM utilisésheartbeat_latency: Délai entre worker et schedulertask_duration_percentiles: P50, P95, P99 des durées
Astuce : Debugging Pool Contention
SELECT pool, COUNT(*), SUM(CASE WHEN state='queued' THEN 1 ELSE 0 END) as queued_count
FROM task_instance
WHERE dag_id='my_dag'
GROUP BY pool;
⚠️ Attention : Race Condition en Pool
Plusieurs schedulers peuvent assigner le même pool_slot à deux tasks différentes (edge case dans HA). Utilisez airflow pools import avec versioning strict et monitorez pool_full_events.
3. Le Metadata Database : Schéma, Requêtes et Optimisation
Définition
La métabase de données Airflow (par défaut PostgreSQL/MySQL) stocke tous les états, configurations, logs et exécutions. C'est la source de vérité du système, mais aussi son point faible si elle devient lente. Une single query peut ralentir le scheduler de 50%.
Analogie
La métabase est le cerveau d'Airflow : elle enregistre les souvenirs (states), les plans (DAG definitions), les réflexes (triggers). Si le cerveau est congestionné, tout ralentit, y compris la respiration (scheduler heartbeat).
Schéma Critique et Indexation
| Table | Lignes (10k DAGs) | Requêtes/sec | Index Critique |
|---|---|---|---|
| dag_run | 300 000+ | 50-100 | dag_id, execution_date |
| task_instance | 15 000 000+ | 200-500 | dag_id, task_id, state, (dag_id, state) |
| task_reschedule | 100 000+ | 10-20 | task_id, execution_date |
| dag | 10 000 | 5-10 | dag_id (PK) |
| log | 50 000 000+ | 100-200 | dag_id, task_id, execution_date |
Queries Coûteuses et Optimisation
Query 1 : Scheduler Parsing DAG
-- LENT : Full table scan
SELECT * FROM task_instance
WHERE dag_id='my_dag' AND execution_date > NOW() - INTERVAL '7 days';
-- OPTIMISÉ : Index + partition
CREATE INDEX idx_ti_dag_exec ON task_instance(dag_id, execution_date DESC);
SELECT * FROM task_instance
WHERE dag_id='my_dag' AND execution_date > NOW() - INTERVAL '7 days'
LIMIT 1000;
Query 2 : Audit des Logs
-- CATASTROPHIQUE : 50M rows scan
SELECT * FROM log WHERE created_at > NOW() - INTERVAL '30 days';
-- BON : Indexation sur created_at + partition by month
-- Puis archiver les anciens logs vers S3
Stratégies de Performance
- Partitioning : Diviser task_instance par execution_date (monthly)
- Vacuum & Analyze : Exécuter régulièrement (PostgreSQL)
- Connection Pooling : SQLAlchemy pool_size=10-20, max_overflow=20
- Query Timeouts : Ajouter
statement_timeout=30sen PostgreSQL - Archive Old Data : Nettoyer les runs > 1 an dans table d'audit
Astuce : Monitorer la Métabase
-- Nombre de task_instances par état (snapshot)
SELECT state, COUNT(*) FROM task_instance GROUP BY state;
-- Requêtes lentes (PostgreSQL)
SELECT query, calls, mean_time FROM pg_stat_statements
WHERE query LIKE '%task_instance%' ORDER BY mean_time DESC LIMIT 5;
⚠️ Attention : Deadlock sous Charge
Plusieurs schedulers updatent simultanément task_instance. Si deux transactions modifient le même DAG run concurremment, deadlock = retry indéfini. Utilisez isolation_level = 'READ_COMMITTED' et NOWAIT locks en SQL.
4. Debugging Avancé : Logs, XCom et Instrumentation
Définition
Le debugging en Airflow ne se limite pas à print() : il faut tracer l'exécution à travers scheduler → executor → worker, capturer XCom (échange de données inter-tasks), et analyser les logs structurés pour identifier les anomalies subtiles.
Analogie
Déboguer Airflow ressemble à résoudre un crime en réseau : vous avez des témoins (logs) dispersés sur 10 serveurs, des indices (XCom), et une timeline (DAG execution). Il faut reconstituer l'histoire complète sans point central.
Architecture Logging Multi-Niveaux
| Niveau | Émetteur | Format | Stockage | Volume |
|---|---|---|---|---|
| Scheduler | Scheduler process | JSON/Text | Fichier + DB | 10MB/jour |
| Executor | Executor process | JSON/Text | Fichier + DB | 50MB/jour |
| Worker | Task execution | Structured logs | Fichier + S3 | 1-10GB/jour |
| API/UI | HTTP requests | JSON | DB (limited) | 5MB/jour |
| Airflow internal | Core modules | DEBUG/INFO | File | 100MB/jour |
Stratégies de Debugging Avancé
1. Capture de XCom pour Debug
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def task_a():
return {"model_id": 42, "accuracy": 0.95}
def task_b(ti):
result = ti.xcom_pull(task_ids='task_a')
print(f"Reçu XCom: {result}")
with DAG('xcom_dag', start_date=datetime(2024,1,1)) as dag:
t_a = PythonOperator(task_id='task_a', python_callable=task_a)
t_b = PythonOperator(task_id='task_b', python_callable=task_b,
provide_context=True)
t_a >> t_b
2. Inspection des Task Instances en Base
SELECT
ti.dag_id, ti.task_id, ti.execution_date, ti.state,
ti.try_number, ti.start_date, ti.end_date,
EXTRACT(EPOCH FROM (ti.end_date - ti.start_date)) as duration_sec,
ti.log_filename
FROM task_instance ti
WHERE ti.dag_id = 'my_dag'
AND ti.execution_date = '2024-01-15 00:00:00'
ORDER BY ti.task_id;
3. Log Streaming depuis Worker Distant
# Récupérer logs en temps réel
airflow tasks log my_dag task_a 2024-01-15T00:00:00Z
# Ou accéder directement si logs stockés en S3
aws s3 cat s3://airflow-logs/my_dag/task_a/2024-01-15T00:00:00Z.log | tail -100
4. Instrumentation Custom avec Prometheus
from prometheus_client import Counter, Histogram
import time
task_duration = Histogram('task_duration_seconds', 'Task execution time',
labelnames=['dag_id', 'task_id'])
task_errors = Counter('task_errors_total', 'Task failures',
labelnames=['dag_id', 'task_id'])
def monitored_task():
with task_duration.labels(dag_id='my_dag', task_id='task_a').time():
try:
# Exécution
pass
except Exception as e:
task_errors.labels(dag_id='my_dag', task_id='task_a').inc()
raise
Astuce : Repérer les Tasks Stuck
from airflow.models import TaskInstance
from datetime import datetime, timedelta
# Tasks en 'running' depuis > 2 heures = stuck
stuck_tasks = session.query(TaskInstance).filter(
TaskInstance.state == 'running',
TaskInstance.start_date < datetime.utcnow() - timedelta(hours=2)
).all()
for ti in stuck_tasks:
print(f"STUCK: {ti.dag_id}.{ti.task_id} depuis {ti.start_date}")
⚠️ Attention : Overflow de Logs
Les logs s'accumulent rapidement (1GB/jour = 365GB/an). Sans rotation/archivage :
- Performance UI dégradée (-50%)
- Métabase complétée à 80%
- Recherche de logs impossible
Utilisez log_retention_days=30 et archivez en S3 via Celery task dédiée.
5. Performance Tuning et Optimisation en Production
Définition
Optimiser Airflow en production signifie ajuster les paramètres du scheduler, executor, pools, et base de données pour atteindre les SLA (Service Level Agreements) : latence de scheduling < 5s, throughput > 1000 tasks/min, reliability > 99.95%.
Analogie
Tuner Airflow, c'est comme affiner un moteur de course : changer l'essence (executor), les pistons (task size), l'échappement (logging), les pneus (worker capacity). Chaque ajustement affecte la performance globale.
Paramètres Critiques à Tuner
| Paramètre | Défaut | Production | Impact | Trade-off |
|---|---|---|---|---|
| scheduler_heartbeat_sec | 5 | 1-2 | Latence scheduling | CPU scheduler +20% |
| dag_dir_list_interval | 300 | 30-60 | Parsing DAGs | I/O disque +30% |
| parallelism | 32 | 128-512 | Concurrence globale | RAM worker +100% |
| max_active_runs_per_dag | 16 | 32-64 | DAG runs simultanées | Métabase stress |
| dag_concurrency | 16 | 32-64 | Tasks par DAG | Isolation DAGs ↓ |
| pool_slots (default) | 128 | 256-512 | Tasks sans pool | Wait time ↓ |
| worker_refresh_batch_size | 1 | 0 (infinite) | Recycling workers | Memory leaks ↓ |
| task_logs_folder cleanup | Manual | Automated | Disk space | Performance -5% |
Profiling Scheduler Bottleneck
# Script de profiling du scheduler
import cProfile
import pstats
from airflow.jobs.scheduler_job import SchedulerJob
profiler = cProfile.Profile()
scheduler = SchedulerJob()
profiler.enable()
scheduler.run()
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
Résultats Typiques :
- 40% temps :
_critical_section_dag_iteration(boucle principale) - 25% temps :
evaluate_dependencies(DAG dependencies) - 20% temps : Requêtes métabase (task_instance SELECT)
- 15% temps : Serialization/Deserialization DAGs
Checklist d'Optimisation Production
Métabase
- Indexes sur (dag_id, execution_date) en task_instance
- Connection pooling SQLAlchemy pool_size=20
- Partition task_instance par mois
- Archivage logs > 30 jours
- VACUUM ANALYZE PostgreSQL hebdo
Scheduler
- High Availability (2+ schedulers, Redis Distributed Lock)
- scheduler_heartbeat_sec = 1-2
- dag_dir_list_interval = 30-60
- Monitoring: task_queue_depth, scheduling_latency
Executor
- CeleryExecutor si > 5k tasks/jour
- KubernetesExecutor si besoin dynamic scaling
- LocalExecutor max 100 tasks (dev/test only)
Workers
- Déployer sur multi-nodes (1 worker per machine)
- worker_refresh_batch_size = 0 (graceful shutdown)
- Monitoring CPU/RAM par worker (seuil 80%)
DAGs
- Max 1000 tasks par DAG
- Max 50 DAGs parsés simultanément
- Utiliser SubDAGs / Dynamic DAGs pour huge graphs
- Caching DAG parsing (airflow 2.3+)
Astuce : Benchmarking Comparatif
# Avant optimisation
time airflow dags list
# Output: 15.3s
# Après optimisation (indexes + pool tuning)
time airflow dags list
# Output: 2.1s
# Gain: 87% ✅
⚠️ Attention : Over-Tuning Risks
- Réduire
scheduler_heartbeat_secà 0.5 = CPU scheduler 100%, starving workers - Augmenter
parallelismà 2000 = Métabase timeout, scheduler crash - Éliminer logs = Impossible debugging en production
- Partitioning table sans index = Perf identique/pire
Le sweet spot : monitoring continu + ajustements incrementaux (10% à la fois).