airflow使用oracle

2023年 8月 5日 35.7k 0

Airflow是一个用于调度、监控和编写多步任务的平台,因为最近在工作中需要处理oracle相关任务,所以在此分享如何在Airflow中使用Oracle数据库。

首先需要安装Oracle驱动程序包cx_Oracle,在Airflow的DAG中可以直接使用PythonOperator调用Oracle进行数据读取、数据清洗、数据加载等操作。假设我们有一个需求是从一个Oracle表中读取数据,清洗后再写入另一个Oracle表中,那么数据清洗的Python代码可以如下所示:

import cx_Oracle
def cleaning_data():
# 连接源Oracle数据库
connection_src = cx_Oracle.connect("user/password@database_src")
cursor_src = connection_src.cursor()
# 查询数据
cursor_src.execute("SELECT * FROM TABLE_SRC")
# 清理数据,并写入目标Oracle数据库
connection_target = cx_Oracle.connect("user/password@database_target")
cursor_target = connection_target.cursor()
for row in cursor_src:
# 数据清洗
row_cleaned = row[0] + ' cleaned'
# 写入目标数据库
cursor_target.execute("INSERT INTO TABLE_TARGET VALUES (:1)", (row_cleaned,))
connection_target.commit()
# 关闭数据库连接
cursor_src.close()
cursor_target.close()
connection_src.close()
connection_target.close()

然后在DAG中可以使用PythonOperator调用该Python函数,如下所示:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
dag = DAG(
'oracle_dag',
description='Oracle DAG',
schedule_interval=timedelta(days=1),
catchup=False,
start_date=datetime(2021, 1, 1),
)
cleaning_data_task = PythonOperator(
task_id='cleaning_data_task',
python_callable=cleaning_data,
dag=dag)

需要注意的是,在执行该DAG之前,需要确认Python环境中是否已经安装了cx_Oracle驱动程序包。

此外,在Airflow中可以使用OracleHook连接Oracle数据库,该Hook提供了一系列的方法来执行SQL查询、数据加载、数据清洗等操作。例如,我们可以通过OracleHook进行数据查询,代码如下所示:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.oracle_operator import OracleOperator
from airflow.providers.oracle.hooks.oracle import OracleHook
from datetime import datetime, timedelta
dag = DAG(
'oracle_dag',
description='Oracle DAG',
schedule_interval=timedelta(days=1),
catchup=False,
start_date=datetime(2021, 1, 1),
)
oracle_hook = OracleHook(oracle_conn_id="oracle_connection")
query_sql = "SELECT * FROM TABLE_NAME"
oracle_task = OracleOperator(
task_id='oracle_task',
oracle_conn_id='oracle_connection',
sql=query_sql,
dag=dag)
def print_query_data(**kwargs):
ti = kwargs['ti']
data = ti.xcom_pull(task_ids='oracle_task')
print(data)
query_data_task = PythonOperator(
task_id='query_data_task',
python_callable=print_query_data,
provide_context=True,
dag=dag)
oracle_task >>query_data_task

在上述代码中,首先通过OracleHook连接Oracle数据库,然后使用OracleOperator执行SQL查询,之后使用PythonOperator打印查询得到的数据。需要注意的是,Airflow默认使用XCom缓存操作的结果,因此在打印查询结果时,需要从上一个任务(即OracleOperator)中获取结果,而不是重新查询一遍。

总之,在Airflow中使用Oracle数据库可以通过不同的方式实现,包括直接使用cx_Oracle进行操作,以及使用OracleHook连接Oracle数据库进行SQL查询、数据加载、数据清洗等操作。只要在DAG中正确设置任务的依赖关系和参数,就可以轻松实现数据处理流程。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论