EdmondFrank's 时光足迹

この先は暗い夜道だけかもしれない それでも信じて進むんだ。星がその道を少しでも照らしてくれるのを。
或许前路永夜,即便如此我也要前进,因为星光即使微弱也会我为照亮前途。
——《四月は君の嘘》

Python在Spark上的机器学习(二)之数据操作



Python在Spark上的机器学习(二)之数据操作

PySpark

PySpark 是 Spark 为 Python 开发者提供的 API。
Spark是用Scala语言写成的,Scala把要编译的东西编译为Java虚拟机(JVM)的字节码(bytecode)。Spark的开源社区开发了一个叫PySpark的工具库。它允许使用者用Python处理RDD。这多亏了一个叫Py4J的库,它让Python可以使用JVM的对象(比如RDD)。
Pyspark Internals这篇wiki里介绍了pyspark的实现机制,大体是下面这张图就可以表示:

enter image description here

Resilient Distributed Datasets

说到Spark上的数据模式,一定不能少的就是Spark中的核心:RDD了。与许多专有的大数据处理平台不同,Spark建立在统一抽象的RDD之上,使得它可以以基本一致的方式应对不同的大数据处理场景,包括MapReduce,Streaming,SQL,Machine Learning以及Graph等。这即Matei Zaharia所谓的“设计一个通用的编程抽象(Unified Programming Abstraction)。这正是Spark让人着迷的地方。

RDD 具体是什么呢?
RDD,全称Resilient Distributed Datasets,又称弹性分布式数据集。是一个可容错的、并行的数据结构,可以让用户显示地将数据储存到磁盘和内存当中,并能控制数据的分区。

RDD本质上是一个内存数据集,在访问RDD时,指针只会指向与操作相关的部分。例如存在一个面向列的数据结构,其中一个实现为Int的数组,另一个实现为Float的数组。如果只需要访问Int字段,RDD的指针可以只访问Int数组,避免了对整个数据结构的扫描。

RDD将操作分为两类:transformation与action。无论执行了多少次transformation操作,RDD都不会真正执行运算,只有当action操作被执行时,运算才会触发。而在RDD的内部实现机制中,底层接口则是基于迭代器的,从而使得数据访问变得更高效,也避免了大量中间结果对内存的消耗。

使用Pyspark

在按照系列的上一个教程搭建好环境后,在终端中直接输入pyspark就可以运行Python与Spark的交互式的shell了。

那么,下面我们就以一些简单的例子来使用pyspark。

创建RDD

1
2
3
4
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12),
     ('Amber', 9)])
data

输出:

ParallelCollectionRDD[3] at parallelize at PythonRDD.scala:475

RDD对象转换成Python对象

1
2
data_heterogenous = sc.parallelize([('Ferrari', 'fast'), {'Porsche': 100000}, ['Spain','visited', 4504]]).collect()
data_heterogenous

输出:

[(‘Ferrari’, ‘fast’), {‘Porsche’: 100000}, [‘Spain’, ‘visited’, 4504]]

读取文件及统计词频

首先word.txt文件内容如下:

The dynamic lifestyle
people lead nowadays
causes many reactions
in our bodies and
the one that is the
most frequent of all
is the headache. However so good

1
2
3
4
5
6
7
8
9
from operator import add
lines = sc.textFile('word.txt')
counts = lines.flatMap(lambda x: x.split(' '))\
.map(lambda x : (x, 1))\
.reduceByKey(add)
output = counts.collect()
print(output)
for (word, count) in output:
    print("%s: %i" %(word, count))

输出:

[(”, 4), (‘good’, 1), (‘in’, 1), (‘is’, 2), (‘However’, 1), (‘of’, 1), (‘causes’, 1), (‘lifestyle’, 1), (‘The’, 1), (‘headache.’, 1), (‘reactions’, 1), (‘most’, 1), (‘frequent’, 1), (‘that’, 1), (‘all’, 1), (‘our’, 1), (‘dynamic’, 1), (‘nowadays’, 1), (‘so’, 1), (‘the’, 3), (‘people’, 1), (‘bodies’, 1), (‘many’, 1), (‘one’, 1), (‘and’, 1), (‘lead’, 1)]

DataFrame

DataFrameDataFrame是Spark推荐的统一结构化数据接口,是一个不可变的分布式数据集合,它结构与关系数据库中的表类似。

类似于Python Pandas DataFrame或R DataFrame,它能够让用户轻松处理结构化数据。

DataFrame还允许用户通过Spark SQL数据库或者采用一些函数式的方法查询及操作结构数据,下面我们就通过一些例子来了解和使用DataFrame。

创建DataFrames

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
stringJSONRDD = sc.parallelize(("""
{ "id": "123",
"name": "Katie",
"age": 19,
"eyeColor": "brown"
}""",
"""{
"id": "234",
"name": "Michael",
"age": 22,
"eyeColor": "green"
}""",
"""{
"id": "345",
"name": "Simone",
"age": 23,
"eyeColor": "blue"
}""")
)
swimmersJSON = spark.read.json(stringJSONRDD)
swimmersJSON.show()
swimmersJSON.createOrReplaceTempView("swimmersJSON") #这里创建了临时表

输出:

+—+——–+—+——-+
|age|eyeColor| id| name|
+—+——–+—+——-+
| 19| brown|123| Katie|
| 22| green|234|Michael|
| 23| blue|345| Simone|
+—+——–+—+——-+

DataFrame的简单内容及类型查询

1
2
spark.sql("select * from swimmersJSON").collect()
swimmersJSON.printSchema() #显示表数据的类型

输出:

[Row(age=19, eyeColor=’brown’, id=’123’, name=’Katie’),
Row(age=22, eyeColor=’green’, id=’234’, name=’Michael’),
Row(age=23, eyeColor=’blue’, id=’345’, name=’Simone’)]
root
|– age: long (nullable = true)
|– eyeColor: string (nullable = true)
|– id: string (nullable = true)
|– name: string (nullable = true)

指定数据储存及处理的类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# Import types
from pyspark.sql.types import *
# Generate comma delimited data
stringCSVRDD = sc.parallelize([
(123, 'Katie', 19, 'brown'),
(234, 'Michael', 22, 'green'),
(345, 'Simone', 23, 'blue')
])
# Specify schema
schema = StructType([
StructField("id", LongType(), True),
StructField("name", StringType(), True),
StructField("age", LongType(), True),
StructField("eyeColor", StringType(), True)
])
# Apply the schema to the RDD and Create DataFrame
swimmers = spark.createDataFrame(stringCSVRDD, schema)
# Creates a temporary view using the DataFrame
swimmers.createOrReplaceTempView("swimmers")
swimmers.printSchema()

输出:

root
|– id: long (nullable = true)
|– name: string (nullable = true)
|– age: long (nullable = true)
|– eyeColor: string (nullable = true)

明明可以自动匹配储存类型,为什么我们还要手动指定类型呢?
因为,在自动匹配类型的情况下,有时会将ID,Age等我们未来将要用来计算的数据以String的方式存储,这样就不利于我们对这些数据进行加减等运算,所以手动指定储存类型还是很有必要的。

使用SQL语句查询及操作数据

1
2
3
4
spark.sql("select count(1) from swimmers").show()
spark.sql("select id, age from swimmers where age = 22").show()
spark.sql(
"select name, eyeColor from swimmers where eyeColor like 'b%'").show()

输出:

+——–+
|count(1)|
+——–+
| 3|
+——–+

+—+—+
| id|age|
+—+—+
|234| 22|
+—+—+

+——+——–+
| name|eyeColor|
+——+——–+
| Katie| brown|
|Simone| blue|
+——+——–+

小结

在这篇文章中我们可以看出,通过Pyspark结合RDD与DataFrames让我们可以用Python用上Spark平台上的分布式优势,也能够进一步加速和优化我们平时的数据操作。通过Spark导出的抽象层的API我们无需学过过于复杂和繁多的语法就能操作RDD上的数据。这篇文章的内容主要是为了后面用在用Python在Spark进行数据建模和机器学习所铺路,但受限于文章篇幅,还有十分多的函数和API无提及。所以有兴趣的读者可以阅读下Spark DataFrame的官方文档深入了解一下。