Apache Airflow是一个开源的工作流管理平台,用于编排和调度ETL(Extract, Transform, Load)任务。它通过DAG(Directed Acyclic Graphs,有向无环图)定义任务流,支持任务依赖关系、重试机制、并行执行等功能。
安装Python和pip
确保系统已经安装了Python和pip。
安装Airflow
使用pip安装Airflow及其依赖项:
export AIRFLOW_HOME=~/airflow
pip install apache-airflow
初始化数据库
初始化Airflow数据库:
airflow db init
创建用户
创建Airflow用户,用于访问Web UI:
airflow users create \
--username admin \
--firstname Admin \
--lastname User \
--role Admin \
--email admin@example.com
启动Airflow
启动Airflow的调度器和Web服务器:
airflow scheduler
airflow webserver --port 8080
访问Airflow Web UI
打开浏览器,访问http://localhost:8080
,进入Airflow的图形化界面。
在Airflow中,DAG(有向无环图)用于定义任务和任务依赖关系。
创建DAG文件
在$AIRFLOW_HOME/dags
目录下创建DAG文件,例如etl_dag.py
:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
def extract():
print("Extracting data...")
def transform():
print("Transforming data...")
def load():
print("Loading data...")
dag = DAG(
'etl_dag',
default_args=default_args,
description='A simple ETL DAG',
schedule_interval=timedelta(days=1),
)
t1 = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag,
)
t2 = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag,
)
t3 = PythonOperator(
task_id='load',
python_callable=load,
dag=dag,
)
t1 >> t2 >> t3
配置任务
在DAG文件中定义任务的Python函数,并使用PythonOperator
进行包装。通过设置任务依赖关系,定义任务的执行顺序。
配置调度
在DAG定义中,通过设置schedule_interval
参数配置任务的调度周期。例如:
dag = DAG(
'etl_dag',
default_args=default_args,
description='A simple ETL DAG',
schedule_interval='@daily',
)
启动调度器
确保Airflow调度器正在运行,调度器会根据配置的调度周期自动触发任务执行:
airflow scheduler
安装数据库驱动
安装MySQL或PostgreSQL的驱动程序,例如:
pip install apache-airflow[mysql]
配置数据库连接
编辑$AIRFLOW_HOME/airflow.cfg
文件,配置数据库连接字符串:
[core]
sql_alchemy_conn = mysql://user:password@localhost/airflow
初始化数据库
初始化新的数据库:
airflow db init
创建插件目录
在$AIRFLOW_HOME/plugins
目录下创建自定义插件目录,例如my_plugins
。
编写插件
在插件目录下创建插件文件,例如my_plugin.py
:
from airflow.plugins_manager import AirflowPlugin
class MyCustomPlugin(AirflowPlugin):
name = "my_custom_plugin"
使用插件
在DAG文件中导入并使用自定义插件。
配置邮件通知
在$AIRFLOW_HOME/airflow.cfg
文件中配置邮件通知参数:
[smtp]
smtp_host = smtp.example.com
smtp_starttls = True
smtp_ssl = False
smtp_user = user@example.com
smtp_password = password
smtp_port = 587
smtp_mail_from = airflow@example.com
设置任务通知
在DAG定义中,配置任务的邮件通知参数:
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'email': ['alerts@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
通过掌握Airflow的基本概念、安装与配置方法,以及如何编写和调度ETL任务,你可以构建灵活、可扩展的工作流管理平台。Airflow的可编程性和丰富的功能,使其适用于各种复杂的任务调度和数据处理场景。