工业大数据之AirFlow的架构组件(32)

2023年 8月 17日 52.4k 0

  • 开发一个Python程序,程序文件中需要包含以下几个部分

  • 注意:该文件的运行不支持utf8编码,不能写中文

  • step1:导包

    # 必选:导入airflow的DAG工作流
    from airflow import DAG
    # 必选:导入具体的TaskOperator类型
    from airflow.operators.bash import BashOperator
    # 可选:导入定时工具的包
    from airflow.utils.dates import days_ago
    

    image-20211015103936196

  • step2:定义DAG及配置

    # 当前工作流的基础配置
    default_args = {
        # 当前工作流的所有者
        'owner': 'airflow',
        # 当前工作流的邮件接受者邮箱
        'email': ['airflow@example.com'],
        # 工作流失败是否发送邮件告警
        'email_on_failure': True,
        # 工作流重试是否发送邮件告警
        'email_on_retry': True,
        # 重试次数
        'retries': 2,
        # 重试间隔时间
        'retry_delay': timedelta(minutes=1),
    }
    
    # 定义当前工作流的DAG对象
    dagName = DAG(
        # 当前工作流的名称,唯一id
        'airflow_name',
        # 使用的参数配置
        default_args=default_args,
        # 当前工作流的描述
        description='first airflow task DAG',
        # 当前工作流的调度周期:定时调度【可选】
        schedule_interval=timedelta(days=1),
        # 工作流开始调度的时间
        start_date=days_ago(1),
        # 当前工作流属于哪个组
        tags=['itcast_bash'],
    )
    
    • 构建一个DAG工作流的实例和配置
  • step3:定义Tasks

    • Task类型:airflow.apache.org/docs/apache…

    • 常用

      • BashOperator - executes a bash command
        • 执行Linux命令
      • PythonOperator - calls an arbitrary Python function
        • 执行Python代码
      • EmailOperator - sends an email
        • 发送邮件的
    • 其他

      • MySqlOperator
      • PostgresOperator
      • MsSqlOperator
      • OracleOperator
      • JdbcOperator
      • DockerOperator
      • HiveOperator
      • PrestoToMySqlOperator
      • ……
    • BashOperator:定义一个Shell命令的Task

      # 导入BashOperator
      from airflow.operators.bash import BashOperator
      # 定义一个Task的对象
      t1 = BashOperator(
      	# 指定唯一的Task的名称
          task_id='first_bashoperator_task',
      	# 指定具体要执行的Linux命令
          bash_command='echo "hello airflow"',
      	# 指定属于哪个DAG对象
          dag=dagName
      )
      
    • PythonOperator:定义一个Python代码的Task

      # 导入PythonOperator
      from airflow.operators.python import PythonOperator
      
      # 定义需要执行的代码逻辑
      def sayHello():
          print("this is a programe")
      
      #定义一个Task对象
      t2 = PythonOperator(
          # 指定唯一的Task的名称
          task_id='first_pyoperator_task',
          # 指定调用哪个Python函数
          python_callable=sayHello,
          # 指定属于哪个DAG对象
          dag=dagName
      )
      

相关文章

服务器端口转发,带你了解服务器端口转发
服务器开放端口,服务器开放端口的步骤
产品推荐:7月受欢迎AI容器镜像来了,有Qwen系列大模型镜像
如何使用 WinGet 下载 Microsoft Store 应用
百度搜索:蓝易云 – 熟悉ubuntu apt-get命令详解
百度搜索:蓝易云 – 域名解析成功但ping不通解决方案

发布评论