Apache Airflow的安装与配置,编写和调度ETL任务

person ~~情~非~    watch_later 2024-07-20 17:31:33
visibility 329    class 大数据,Airflow    bookmark 专栏

Apache Airflow的安装与配置,编写和调度ETL任务

什么是Apache Airflow

Apache Airflow是一个开源的工作流管理平台,用于编排和调度ETL(Extract, Transform, Load)任务。它通过DAG(Directed Acyclic Graphs,有向无环图)定义任务流,支持任务依赖关系、重试机制、并行执行等功能。

Airflow的关键特性

  1. 可编程性:使用Python编写任务和工作流。
  2. 可扩展性:支持插件扩展,易于集成各种数据源和目标。
  3. 调度和监控:提供丰富的调度和监控功能,支持任务重试、通知等。
  4. 图形化界面:提供Web UI,用于管理和监控工作流。

Airflow的安装与配置

环境准备

  • 操作系统:支持多种操作系统,包括Windows、macOS和Linux。
  • Python环境:建议使用Python 3.6或以上版本。
  • 依赖项:需要安装并配置数据库(默认使用SQLite,建议使用MySQL或PostgreSQL)。

安装Airflow

  1. 安装Python和pip
    确保系统已经安装了Python和pip。

  2. 安装Airflow
    使用pip安装Airflow及其依赖项:

    export AIRFLOW_HOME=~/airflow
    pip install apache-airflow
    
  3. 初始化数据库
    初始化Airflow数据库:

    airflow db init
    
  4. 创建用户
    创建Airflow用户,用于访问Web UI:

    airflow users create \
        --username admin \
        --firstname Admin \
        --lastname User \
        --role Admin \
        --email admin@example.com
    
  5. 启动Airflow
    启动Airflow的调度器和Web服务器:

    airflow scheduler
    airflow webserver --port 8080
    
  6. 访问Airflow Web UI
    打开浏览器,访问http://localhost:8080,进入Airflow的图形化界面。

编写和调度ETL任务

创建DAG

在Airflow中,DAG(有向无环图)用于定义任务和任务依赖关系。

  1. 创建DAG文件
    $AIRFLOW_HOME/dags目录下创建DAG文件,例如etl_dag.py

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2023, 1, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    def extract():
        print("Extracting data...")
    
    def transform():
        print("Transforming data...")
    
    def load():
        print("Loading data...")
    
    dag = DAG(
        'etl_dag',
        default_args=default_args,
        description='A simple ETL DAG',
        schedule_interval=timedelta(days=1),
    )
    
    t1 = PythonOperator(
        task_id='extract',
        python_callable=extract,
        dag=dag,
    )
    
    t2 = PythonOperator(
        task_id='transform',
        python_callable=transform,
        dag=dag,
    )
    
    t3 = PythonOperator(
        task_id='load',
        python_callable=load,
        dag=dag,
    )
    
    t1 >> t2 >> t3
    
  2. 配置任务
    在DAG文件中定义任务的Python函数,并使用PythonOperator进行包装。通过设置任务依赖关系,定义任务的执行顺序。

调度DAG

  1. 配置调度
    在DAG定义中,通过设置schedule_interval参数配置任务的调度周期。例如:

    dag = DAG(
        'etl_dag',
        default_args=default_args,
        description='A simple ETL DAG',
        schedule_interval='@daily',
    )
    
  2. 启动调度器
    确保Airflow调度器正在运行,调度器会根据配置的调度周期自动触发任务执行:

    airflow scheduler
    

高级操作

使用数据库存储任务状态

  1. 安装数据库驱动
    安装MySQL或PostgreSQL的驱动程序,例如:

    pip install apache-airflow[mysql]
    
  2. 配置数据库连接
    编辑$AIRFLOW_HOME/airflow.cfg文件,配置数据库连接字符串:

    [core]
    sql_alchemy_conn = mysql://user:password@localhost/airflow
    
  3. 初始化数据库
    初始化新的数据库:

    airflow db init
    

使用自定义插件

  1. 创建插件目录
    $AIRFLOW_HOME/plugins目录下创建自定义插件目录,例如my_plugins

  2. 编写插件
    在插件目录下创建插件文件,例如my_plugin.py

    from airflow.plugins_manager import AirflowPlugin
    
    class MyCustomPlugin(AirflowPlugin):
        name = "my_custom_plugin"
    
  3. 使用插件
    在DAG文件中导入并使用自定义插件。

配置通知和警报

  1. 配置邮件通知
    $AIRFLOW_HOME/airflow.cfg文件中配置邮件通知参数:

    [smtp]
    smtp_host = smtp.example.com
    smtp_starttls = True
    smtp_ssl = False
    smtp_user = user@example.com
    smtp_password = password
    smtp_port = 587
    smtp_mail_from = airflow@example.com
    
  2. 设置任务通知
    在DAG定义中,配置任务的邮件通知参数:

    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2023, 1, 1),
        'email': ['alerts@example.com'],
        'email_on_failure': True,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    

总结

通过掌握Airflow的基本概念、安装与配置方法,以及如何编写和调度ETL任务,你可以构建灵活、可扩展的工作流管理平台。Airflow的可编程性和丰富的功能,使其适用于各种复杂的任务调度和数据处理场景。

评论区
评论列表
menu