Pyspark上手使用

前面复习了一下Spark一些的架构内容,这次复习一下上手Pyspark做一些数据处理。

上手之前看了一下文档,发现在Spark2.0之后,RDD的概念被Dataset取代了,Dataset也是一个分布式数据集,只不过做了一些底层的优化。

简单Demo

首先上手一个简单的Pyspark的Demo,对数据进行一些处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("CSV Loader").getOrCreate()
df = spark.read.csv("train.csv", header=True, inferSchema=True)

line_id_0 = df.filter(df.id==0)

# 需要show才能看到
line_id_0.show()

# 计算AveRooms字段的单词数,并显示
# select用于选择,size用于计算长度,split用于分割
df.select(size(split(df.AveRooms, " ")).alias("num_words")).show()

# 缓存
log_data = df.cache()
log_data.count()

log_data.select("id","AveRooms").show(5)

# 选择AveRooms列,并将其转换为一个列表
nums = log_data.select("AveRooms").rdd.flatMap(lambda x: x).collect()
print(nums)

流程解析

分析一下上面的流程:

  • 首先需要实例化一个SparkSession对象,这是Spark所有功能的入口。例如:
    • spark = SparkSession.builder.appName("CSV Loader").getOrCreate()
  • 随后,需要从一个数据源创建Spark中的DataFrame。例如:
    • df = spark.read.csv("train.csv", header=True, inferSchema=True)
    • df = spark.read.json("examples/src/main/resources/people.json")
  • 在获取到DataFrame之后,能够进行一些基础的操作,例如查看Schema,展示数据,groupBy等操作。例如:
    • df.printSchema()
    • df.select("name").show()
    • df.select(df['name'], df['age'] + 1).show()
    • df.filter(df['age'] > 21).show()
    • df.groupBy("age").count().show()
  • 此外,能够直接执行一些SQL语句。例如:
    • sqlDF = spark.sql("SELECT * FROM people")
  • 为了加速读取数据,能够对一些需要长期执行的数据进行缓存,这种方式能够将内存更合理的利用,例如:
    • log_data = df.cache()

Pyspark涵盖了多个接口,建议直接查看源码。路径是examples/src/main/python

2024/6/26 于苏州