🚀 Airflow Monitoring Agent
📋 Descripción
Este proyecto implementa un asistente inteligente para monitorear y gestionar flujos de trabajo (DAGs) en Apache Airflow utilizando modelos de lenguaje avanzados. El agente permite consultar en lenguaje natural el estado de los DAGs, aplicar filtros por diversos criterios y obtener información detallada sobre las ejecuciones programadas.
✨ Características principales
- Consultas en lenguaje natural: Interactúa con tus DAGs de Airflow usando preguntas en español (o cualquier idioma).
- Filtrado inteligente: Busca DAGs por ID, propietario o estado (pausado/activo).
- Información detallada: Obtén datos completos sobre los próximos intervalos de ejecución y el historial de cada DAG.
- Soporte multi-entorno: Configuración para diferentes entornos (desarrollo, integración, preproducción, producción).
- Autenticación con Google Cloud: Integración con servicios de GCP para acceso seguro.
- Paginación automática: Gestión eficiente de grandes colecciones de DAGs.
🛠️ Tecnologías utilizadas
- Python: Lenguaje base de implementación
- Pydantic-AI: Framework para la creación de agentes IA
- Google Gemini 2.0: Modelo de IA para procesamiento de lenguaje natural
- Apache Airflow API: Para acceder a la información de los DAGs
- HTTPX: Cliente HTTP asíncrono para Python
- Asyncio: Para operaciones asíncronas
- Colorlog: Mejora la visualización de logs con colores
📦 Requisitos previos
- Python 3.9+
- Acceso a una instancia de Apache Airflow (preferiblemente en Google Cloud Composer)
- Cuenta de Google Cloud con permisos adecuados
- Paquetes Python especificados en
requirements.txt
🚀 Instalación
- Clona este repositorio:
git clone https://github.com/tu-usuario/airflow-monitoring-agent.git cd airflow-monitoring-agent
- Instala las dependencias (usando uv o pip):
uv install -r requirements.txt
- Configura tus credenciales de GCP si es necesario
💻 Uso
El agente se ejecuta desde la línea de comandos, especificando el entorno y si se requiere autenticación:
uv run agent.py <entorno> <auth>
Donde:
<entorno>
puede ser: dev, itg, pre, pro<auth>
puede ser: si, no
Ejemplo:
uv run agent.py dev no
Una vez iniciado, el agente te pedirá tu consulta. Algunos ejemplos:
- “Muéstrame todos los DAGs pausados”
- “¿Cuál es el estado del DAG data_processing_dag?”
- “Busca los DAGs que pertenecen al usuario analytics_team”
- “¿Cuándo será la próxima ejecución del DAG etl_workflow?”
🏗️ Arquitectura
Componentes principales
- Modelos de datos (Pydantic):
DAGStatus
: Modelo para la información de un DAG individualDAGStatusList
: Contenedor para múltiples estados de DAGs
- Agente IA:
- Configurado con Gemini 2.0
- Sistema de prompts diseñado para entender consultas sobre Airflow
- Herramientas para interactuar con la API de Airflow
- Herramientas del agente:
list_dags
: Recupera todos los DAGs disponiblesget_dag_status
: Obtiene información detallada filtrando por varios criterios
- Utilidades:
- Autenticación con Google Cloud
- Manejo de tokens de acceso
- Configuración de logging mejorado
🔍 Detalles de implementación
Sistema de agente
El agente utiliza un enfoque basado en herramientas que permite al modelo de IA decidir qué acciones tomar en función de la consulta del usuario. El flujo típico es:
- El usuario hace una consulta en lenguaje natural
- El modelo analiza la consulta y decide qué información necesita
- Primero obtiene una lista de todos los DAGs disponibles
- Determina qué filtros aplicar según la consulta
- Ejecuta
get_dag_status
con los filtros apropiados - Formatea la respuesta para el usuario
Manejo de paginación
Una característica clave es el manejo eficiente de grandes conjuntos de DAGs mediante paginación:
while True:
params = {
'limit': limit,
'offset': offset
}
response = await client.get(base_uri, headers=_headers, params=params)
# Procesamiento...
if len(all_dags) >= total_entries or not current_dags:
break
offset += limit
Filtrado inteligente
El agente implementa un sistema de filtrado flexible:
if dag_id:
filtered_dags = [dag for dag in filtered_dags if dag['dag_id'] == dag_id]
if owner:
filtered_dags = [dag for dag in filtered_dags if owner in dag.get('owners', [])]
if is_paused is not None:
filtered_dags = [dag for dag in filtered_dags if dag.get('is_paused') == is_paused]
📊 Ejemplos de salida
Cuando el agente encuentra DAGs que coinciden con tu consulta, presentará información detallada como:
Found DAGs:
-------------------
DAGStatus(
dag_id='data_processing_workflow',
dag_display_name='Data Processing Pipeline',
is_paused=False,
next_dag_run_data_interval_start='2023-11-12T00:00:00+00:00',
next_dag_run_data_interval_end='2023-11-13T00:00:00+00:00',
last_dag_run_id='scheduled__2023-11-11T00:00:00+00:00',
last_dag_run_state='success',
total_dag_runs=124
)
🔮 Mejoras futuras
- Interfaz web: Desarrollar una interfaz de usuario web para interactuar con el agente
- Acciones de control: Permitir acciones como pausar/reanudar DAGs, disparar ejecuciones
- Alertas proactivas: Notificaciones sobre fallos o comportamientos anómalos
- Análisis histórico: Estadísticas y tendencias de rendimiento
- Integración con chat: Conectar con plataformas como Slack o Discord
📄 Licencia
Este proyecto está licenciado bajo MIT License - ver el archivo LICENSE para más detalles.
👥 Contribuciones
Las contribuciones son bienvenidas. Por favor, abre un issue o pull request para sugerencias, mejoras o correcciones.
⭐ Si encuentras útil este proyecto, ¡no dudes en darle una estrella en GitHub! ⭐