-
开发一个Python程序,程序文件中需要包含以下几个部分
-
注意:该文件的运行不支持utf8编码,不能写中文
-
step1:导包
# 必选:导入airflow的DAG工作流 from airflow import DAG # 必选:导入具体的TaskOperator类型 from airflow.operators.bash import BashOperator # 可选:导入定时工具的包 from airflow.utils.dates import days_ago
-
step2:定义DAG及配置
# 当前工作流的基础配置 default_args = { # 当前工作流的所有者 'owner': 'airflow', # 当前工作流的邮件接受者邮箱 'email': ['airflow@example.com'], # 工作流失败是否发送邮件告警 'email_on_failure': True, # 工作流重试是否发送邮件告警 'email_on_retry': True, # 重试次数 'retries': 2, # 重试间隔时间 'retry_delay': timedelta(minutes=1), } # 定义当前工作流的DAG对象 dagName = DAG( # 当前工作流的名称,唯一id 'airflow_name', # 使用的参数配置 default_args=default_args, # 当前工作流的描述 description='first airflow task DAG', # 当前工作流的调度周期:定时调度【可选】 schedule_interval=timedelta(days=1), # 工作流开始调度的时间 start_date=days_ago(1), # 当前工作流属于哪个组 tags=['itcast_bash'], )
- 构建一个DAG工作流的实例和配置
-
step3:定义Tasks
-
Task类型:airflow.apache.org/docs/apache…
-
常用
BashOperator
- executes a bash command- 执行Linux命令
PythonOperator
- calls an arbitrary Python function- 执行Python代码
EmailOperator
- sends an email- 发送邮件的
-
其他
MySqlOperator
PostgresOperator
MsSqlOperator
OracleOperator
JdbcOperator
DockerOperator
HiveOperator
PrestoToMySqlOperator
- ……
-
BashOperator:定义一个Shell命令的Task
# 导入BashOperator from airflow.operators.bash import BashOperator # 定义一个Task的对象 t1 = BashOperator( # 指定唯一的Task的名称 task_id='first_bashoperator_task', # 指定具体要执行的Linux命令 bash_command='echo "hello airflow"', # 指定属于哪个DAG对象 dag=dagName )
-
PythonOperator:定义一个Python代码的Task
# 导入PythonOperator from airflow.operators.python import PythonOperator # 定义需要执行的代码逻辑 def sayHello(): print("this is a programe") #定义一个Task对象 t2 = PythonOperator( # 指定唯一的Task的名称 task_id='first_pyoperator_task', # 指定调用哪个Python函数 python_callable=sayHello, # 指定属于哪个DAG对象 dag=dagName )
-