基于PySpark SQL的媒体浏览日志ETL作业

2023年 11月 27日 96.3k 0

pyspark除了官方的文档,网上的教程资料一直很少,但基于调度平台下,使用pyspark编写代码非常高效,程序本身是提交到spark集群中,性能上也是毫无问题的,在本文中,我们将深入探讨基于Spark的媒体浏览日志ETL(提取、转换、加载)流水线的详细实现,在展示如何使用PySpark SQL处理大规模的媒体浏览日志数据,包括IP地址转换、数据清洗、时间维度补充、码表关联等关键步骤。

一、环境配置

首先,我们需要创建一个SparkSession并导入必要的库和设置默认参数,包括与IP-to-Location数据库的交互以及其他相关的配置。

如果pyspark仅仅是本地运行而不是提交集群时,可以使用findspark库,它能够帮助我们快速初始化Spark环境。在开始之前,确保您已经成功安装了findspark库,并已经下载并解压了Spark二进制文件。将Spark的安装路径和Python解释器路径指定为变量。

import findspark

# 指定Spark的安装路径
spark_home = "/usr/local/spark"

# 指定用于Spark的Python解释器路径
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"

# 使用findspark.init方法初始化Spark环境
findspark.init(spark_home, python_path)

findspark.init方法将帮助设置PYSPARK_PYTHON和SPARK_HOME环境变量,确保正确的Spark库和配置文件被加载。其简化了Spark环境的初始化过程,避免手动配置环境变量。

二、数据处理

接下来,我们定义了一个NewsEtl类,用于执行数据处理和转换的各个步骤。这包括从HDFS中读取媒体浏览日志数据,进行IP地址转、换,清洗数据,添加时间维度,补充码表信息等。

在spark_function中,我们详细说明了数据处理的逻辑。这包括读取媒体浏览日志数据、进行IP地址转换、添加时间维度、补充码表信息、数据清洗和最终写入HDFS等步骤。

1.数据读取

首先,我们使用PySpark的read方法从HDFS中读取媒体浏览日志数据。我们指定了数据的schema,以确保正确地解析每一列。

df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))

2.IP地址转换

接下来,我们通过iptranslate函数将IP地址转换为地理位置信息。这使用了XdbSearcher类,该类负责读取xdb文件并执行IP地址的二分查找。

# 根据IP地址获取地点信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')

3.时间维度添加

我们生成当前时间的时间戳,并添加各种时间格式的列,包括年、季度、月、日、小时等。

# 生成当前时间的时间戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各种时间格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')

4.数据清洗

最后,我们对数据进行清洗,包括将空值替换为默认值、字符串去除空格、数据类型转换等。

# 数据清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...

5.最终写入HDFS

最终,我们将处理后的数据写入HDFS,采用分区方式存储,以便更高效地管理和查询。

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

通过这一系列步骤,我们完成了对媒体浏览日志数据的全面处理,包括数据转换、地理位置信息的添加、时间维度的补充和数据清洗等关键步骤。

三、结论

通过详细的实现步骤,深入解析了基于Spark的媒体浏览日志ETL任务的构建过程。这个任务可以根据具体需求进行调整和扩展,为大规模数据处理任务提供了一种高效而灵活的解决方案。

相关文章

Oracle如何使用授予和撤销权限的语法和示例
Awesome Project: 探索 MatrixOrigin 云原生分布式数据库
下载丨66页PDF,云和恩墨技术通讯(2024年7月刊)
社区版oceanbase安装
Oracle 导出CSV工具-sqluldr2
ETL数据集成丨快速将MySQL数据迁移至Doris数据库

发布评论