EdmondFrank's 时光足迹

この先は暗い夜道だけかもしれない それでも信じて進むんだ。星がその道を少しでも照らしてくれるのを。
或许前路永夜,即便如此我也要前进,因为星光即使微弱也会我为照亮前途。
——《四月は君の嘘》

Python在Spark上的机器学习之机器学习实战(下)



Python在Spark上的机器学习之机器学习实战(下)

MLlib 的使用(续)

我们在上篇讲到了:数据相关性分析和特征选取,但是我们在上篇中所提及的方法基本都是针对标准的数值型的数据特征;那么,我们下篇就继续将分类变量的统计检验分析,以及最后的建模过程讲述完整。

 统计校验

在通过特征变量的相关系数选择特征时,对于一般的分类变量而言,我们无法计算它们之间的相关系数,但是我们可以通过对它们进行卡方校验来检测它们的分布之间是否存在较大的差异。

卡方检验:是用途非常广的一种假设检验方法,它在分类资料统计推断中的应用,包括:两个样本率或两个构成比比较的卡方检验;多个样本率或多个构成比比较的卡方检验以及分类资料的相关分析等。

卡方检验就是统计样本的实际观测值与理论推断值之间的偏离程度,实际观测值与理论推断值之间的偏离程度就决定卡方值的大小,卡方值越大,越不符合;卡方值越小,偏差越小,越趋于符合,若两个值完全相等时,卡方值就为0,表明理论值完全符合。

而在PySpark中你可以用 .chiSqTest() 方法来轻松实现卡方检验。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pyspark.mllib.linalg as ln
for cat in categorical_cols[1:]:
  agg = births_transformed \
      .groupby('INFANT_ALIVE_AT_REPORT') \
      .pivot(cat) \
      .count()
  agg_rdd = agg \
      .rdd \
      .map(lambda row: (row[1:])) \
      .flatMap(lambda row:
              [0 if e == None else e for e in row]) \
      .collect()
  row_length = len(agg.collect()[0]) - 1
  agg = ln.Matrices.dense(row_length, 2, agg_rdd)
  test = st.Statistics.chiSqTest(agg)
  print(cat, round(test.pValue, 4))

我们遍历所有的分类变量并以 infant_alive_ at_report进行分类统计。下一步,我们需要将其转化成RDD,所以我们要先利用pyspark.mllib.linalg模将它们转换成一个矩阵。
当我们成功将其转换成矩阵的形式之后,我们就可以用.chiSqTest()来校验我们的结果。

最后结果显示如下:

chisqtest.png

从结果我们可以看出,所有分类变量对理论值的预测都是有意义的,因此,我们在构建最后的预测模型的时候都要考虑上这些分类型特征变量。

创建最后的待训练数据集

经过一轮的数据分析和特征变量筛选之后,最终到了我们最终的建模阶段了。首先我们将筛选出来以DataFrame数据结构模型表达的数据转换成以LabeledPoints形式表示的RDD。

LabeledPoint 是 MLlib 中的一种数据结构,它包含了两个属性值:label(标识),features(特征)一般用作机器学习模型的训练。

其中,label就是我们目标的分类的标识而features就是我们用于分类的特征,
通常是一个Numpy 数组,列表,psyspark.mllib.linalg.SparseVector,pyspark.mllib,linalg.DenseVector或者是scipy.sparse的形式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pyspark.mllib.feature as ft
import pyspark.mllib.regression as reg
hashing = ft.HashingTF(7)
births_hashed = births_transformed \
  .rdd \
  .map(lambda row: [
      list(hashing.transform(row[1]).toArray())
          if col == 'BIRTH_PLACE'
          else row[i]
      for i, col
      in enumerate(features_to_keep)]) \
  .map(lambda row: [[e] if type(e) == int else e
          for e in row]) \
  .map(lambda row: [item for sublist in row
          for item in sublist]) \
  .map(lambda row: reg.LabeledPoint(
      row[0],
      ln.Vectors.dense(row[1:]))
      )

划分训练集和测试集

形如sklearn.model_selection.train_test_split随机划分训练集和测试集的模块一般,在PySpark中RDDs也有一个便利的.randomSplit(…)方法用于随机划分训练集和测试集。

在本例中可以这样使用

births_train, births_test = births_hashed.randomSplit([0.6, 0.4])

没错,仅仅需要上面这样一行的代码,我们就可以将我们的待训练数据按照随机60%,40%来划分好我们的训练集和测试集了。

开始建模

在一切准备就绪之后,我们就可以开始通过我们上面的训练数据集来建模了。在这里我们来尝试建立两个模型:一个线性的Logistic回归模型,一个非线性的随机森林模型。然后,在初次建模的时候,我们先采用筛选出来的全部特征来建模,然后我们再通过ChiSqSelector(…)方法来归纳出最能代表全部整体的四个主成分。

Logistic 回归模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from pyspark.mllib.classification \
import LogisticRegressionWithLBFGS
LR_Model = LogisticRegressionWithLBFGS \
.train(births_train, iterations=10)

LR_results = (
births_test.map(lambda row: row.label) \
.zip(LR_Model \
.predict(births_test\
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))

import pyspark.mllib.evaluation as ev
LR_evaluation = ev.BinaryClassificationMetrics(LR_results)
print('Area under PR: {0:.2f}' \
.format(LR_evaluation.areaUnderPR))
print('Area under ROC: {0:.2f}' \
.format(LR_evaluation.areaUnderROC))
LR_evaluation.unpersist()

从上面的建模过程可以看出,使用PySpark训练一个模型也是非常简单的。我们只需要调用.train(…)方法,并传入之前处理好的LabeledPoints数据即可。不过需要注意的一点是我们要提前指定一个较小训练的迭代次数以免训练时间过长。

同时,在上面的代码中,我们在训练完一个模型之后使用MLlib中为我们提供的评估分类和回归准确度的.BinaryClassificationMetrics(…)方法来分析我们最后预测的结果。

最后,结果图示如下:

logistic_roc.png

通过PR,ROC的结果,我们可以看出,这个模型还是可接受的。

选取出最具代表性的分类特征

通常来说,一个采取更少的特征的简单模型,往往会比一个复杂的模型,在分类问题上更具有代表性和可解释性。而在MLlib中,则可以通过.Chi-Square selector来提取出模型中最具代表性的一些分类特征变量来简化我们的模型。

1
2
3
4
5
6
7
8
9
10
11
12
13
selector = ft.ChiSqSelector(4).fit(births_train)
topFeatures_train = (
  births_train.map(lambda row: row.label) \
  .zip(selector \
      .transform(births_train \
          .map(lambda row: row.features)))
  ).map(lambda row: reg.LabeledPoint(row[0], row[1]))
topFeatures_test = (
  births_test.map(lambda row: row.label) \
  .zip(selector \
      .transform(births_test \
          .map(lambda row: row.features)))
).map(lambda row: reg.LabeledPoint(row[0], row[1]))

随机森林模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from pyspark.mllib.tree import RandomForest
RF_model = RandomForest \
.trainClassifier(data=topFeatures_train,
numClasses=2,
categoricalFeaturesInfo={},
numTrees=6,
featureSubsetStrategy='all',
seed=666)

RF_results = (
topFeatures_test.map(lambda row: row.label) \
.zip(RF_model \
.predict(topFeatures_test \
.map(lambda row: row.features)))
)
RF_evaluation = ev.BinaryClassificationMetrics(RF_results)
print('Area under PR: {0:.2f}' \
.format(RF_evaluation.areaUnderPR))

print('Area under ROC: {0:.2f}' \
.format(RF_evaluation.areaUnderROC))
model_evaluation.unpersist()

随机森林模型(Random forest 后面简称RF)在训练上总体与Logistic类似,不同的参数是RF在训练前需要指定类别总数:numClasses,树的棵数:numTrees(这两个参数的意义大家可以参照下随机森林模型的百科介绍

注:在随机森林模型的创建中,我们采用的是上面提取出来的最具代表性的有效特征,这就意味着模型用到的特征是比之前的Logistic要少的。

最后,结果图示如下:

rf_roc.png

通过结果我们可以看出,随机森林模型,在采用比之前更少的特征下的建模的最终预测效果是由于之前的Logistic回归模型的。

下面我们同样使用代表性特征来重建一次Logistic回归模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
LR_Model_2 = LogisticRegressionWithLBFGS \
.train(topFeatures_train, iterations=10)
LR_results_2 = (
topFeatures_test.map(lambda row: row.label) \
.zip(LR_Model_2 \
.predict(topFeatures_test \
.map(lambda row: row.features)))
).map(lambda row: (row[0], row[1] * 1.0))
LR_evaluation_2 = ev.BinaryClassificationMetrics(LR_results_2)
print('Area under PR: {0:.2f}' \
.format(LR_evaluation_2.areaUnderPR))
print('Area under ROC: {0:.2f}' \
.format(LR_evaluation_2.areaUnderROC))
LR_evaluation_2.unpersist()

最终结果:
logistic_lbfgs.png

通过结果,我们可以看出,虽然没有达到RF模型的准确度,但是与采用了全特征的Logistic回归模型处于同一水平。所以,我们在可选的情况下,通常采用更少的特征来构建更为简化和有效的模型。

小结

到这里,Python在Spark上的机器学习的实战案例也结束了,欢迎大家继续关注我的博客。

Python在Spark上的机器学习之机器学习实战(上)



Python在Spark上的机器学习之机器学习实战(上)

MLlib 的使用

在上面的章节之中,我们已经讲过了如何利用PySpark进行数据操作和分析了。那么在这篇文章中,我们就真正利用PySpark结合MLlib来建立一个分类模型。

MLlib:即Machine Learning Library,MLlib 是Spark对常用的机器学习算法的实现库,同时包括相关的测试和数据生成器。MLlib 目前支持四种常见的机器学习问题:二元分类,回归,聚类以及协同过滤,同时也包括一个底层的梯度下降优化基础算法。

载入和转化数据

首先,我们在建立一个DataFrame之前,我们先针对性的指定下DataFrame中数据类型,方便我们数据后期的分析与计算。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import pyspark.sql.types as typ
labels = [
('INFANT_ALIVE_AT_REPORT', typ.StringType()),
('BIRTH_YEAR', typ.IntegerType()),
('BIRTH_MONTH', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('MOTHER_RACE_6CODE', typ.StringType()),
('MOTHER_EDUCATION', typ.StringType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('FATHER_EDUCATION', typ.StringType()),
('MONTH_PRECARE_RECODE', typ.StringType()),
...
('INFANT_BREASTFED', typ.StringType())
]
schema = typ.StructType([
typ.StructField(e[0], e[1], False) for e in labels
])

下一步,我们通过 .read.csv() 方法来载入数据,这个方法除了能够载入原数据之外还可以载入GZipped压缩后的csv数据。其实header参数设为 True 代表数据文件的第一行是数据的元信息(即为列表的说明字段)。

births = spark.read.csv('births_train.csv.gz',
header=True,
schema=schema)

由于在我们的数据集中有大量的分类变量都是字符串,所以我们首先要想办法将这一类变量转换成数字的形式。

# 转换'INFANT_ALIVE_AT_REPORT'
recode_dictionary = {
    'YNU': {
        'Y': 1,
        'N': 0,
        'U': 0
            }
}

在这里总的来说,我们的目的就是一个二分类问题,即预测婴儿的存活情况,也就是“存活 1 ”或“死亡 0 ”。因为,要做到一种未雨绸缪的效果,我们要先去除所有与婴儿有关的特征信息,仅仅是通过婴儿父母的基本信息以及婴儿的出生地来预测一下婴儿出生后存活的概率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
selected_features = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_BEFORE',
'CIG_1_TRI',
'CIG_2_TRI',
'CIG_3_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT',
'MOTHER_WEIGHT_GAIN',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_trimmed = births.select(selected_features)

在这个数据集中,大量的变量特征值都是Yes/No/Unknown值,我们将Yes编码成1,另外的其他值编码成0。

而在代表怀孕妈妈的吸烟数量的这个特征值的编码上,我们采用这样的规则。0:代表妈妈在怀孕期间没有抽过烟;而1-97:代表妈妈在怀孕期间真实的抽烟次数,而98:则代表孕期抽烟次数高达98次及以上;但99:意味着妈妈的孕期抽烟情况未知。

1
2
3
4
5
6
7
8
9
import pyspark.sql.functions as func
def recode(col, key):
  return recode_dictionary[key][col]
def correct_cig(feat):
  return func \
      .when(func.col(feat) != 99,
      func.col(feat))\
      .otherwise(0)
  rec_integer = func.udf(recode,typ.IntegerType())

由于Spark的机制问题,我们无法直接将DataFrame来用recode函数进行处理,所以我们首先要先它转换成Spark能够理解的UDF。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
births_transformed = births_trimmed \
.withColumn('CIG_BEFORE', correct_cig('CIG_BEFORE'))\
.withColumn('CIG_1_TRI', correct_cig('CIG_1_TRI'))\
.withColumn('CIG_2_TRI', correct_cig('CIG_2_TRI'))\
.withColumn('CIG_3_TRI', correct_cig('CIG_3_TRI'))
cols = [(col.name, col.dataType) for col in births_trimmed.schema]
YNU_cols = []

for i, s in enumerate(cols):
if s[1] == typ.StringType():
dis = births.select(s[0]) \
.distinct() \
.rdd \
.map(lambda row: row[0]) \
  .collect()
if 'Y' in dis:
  YNU_cols.append(s[0])

最后,为了一次性转换所有的 YNU_cols 数据,我们用以下的方法:

exprs_YNU = [
    rec_integer(x,
    func.lit('YNU')).alias(x)
    if x in YNU_cols
    else x
    for x in births_transformed.columns
]
births_transformed = births_transformed.select(exprs_YNU)

让我们来检查一下转换的结果吧:

births_transformed.select(YNU_cols[-5:]).show(5)

translate_res.png

数据预分析

为了建立一个良好的统计模型,我们首先需要了解清楚数据的组成分布以及背后的含义。

下面我们可以通过Spark提供的一些函数来对数据进行描述性分析。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import pyspark.mllib.stat as st
import numpy as np
numeric_cols = ['MOTHER_AGE_YEARS','FATHER_COMBINED_AGE',
'CIG_BEFORE','CIG_1_TRI','CIG_2_TRI','CIG_3_TRI',
'MOTHER_HEIGHT_IN','MOTHER_PRE_WEIGHT',
'MOTHER_DELIVERY_WEIGHT','MOTHER_WEIGHT_GAIN'
]
numeric_rdd = births_transformed\
  .select(numeric_cols)\
  .rdd \
  .map(lambda row: [e for e in row])
mllib_stats = st.Statistics.colStats(numeric_rdd)
for col, m, v in zip(numeric_cols,
  mllib_stats.mean(),
  mllib_stats.variance()):
  print('{0}: \t{1:.2f} \t {2:.2f}'.format(col, m, np.sqrt(v)))

statics.png

根据输出的统计结果我们可以看出:在婴儿父母的年龄对比上,妈妈是明显比爸爸年轻的。妈妈的平均年龄在28岁左右,而爸爸的平均年龄确是44岁。

对于大部分的分类变量,我们也可以一一的来统计下他们的各个数值出现的频数:

1
2
3
4
5
6
7
8
9
10
11
12
13
categorical_cols = [e for e in births_transformed.columns
if e not in numeric_cols]
categorical_rdd = births_transformed\
  .select(categorical_cols)\
  .rdd \
  .map(lambda row: [e for e in row])
for i, col in enumerate(categorical_cols):
  agg = categorical_rdd \
      .groupBy(lambda row: row[i]) \
      .map(lambda row: (row[0], len(row[1])))
  print(col, sorted(agg.collect(),
key=lambda el: el[1],
reverse=True))

frequence.png

根据这次的结果,我们又可以看出大部分的婴儿都是在医院出现的(医院的出生地代号BIRTH_PLACE=1)

相关系数

相关性的分析有利于我们发现特征变量中的多重共线性的情况,而多重共线性则是影响我们模型的鲁棒性的关键因素之一。

1
2
3
4
5
6
7
8
9
10
corrs = st.Statistics.corr(numeric_rdd)
for i, el in enumerate(corrs > 0.5):
  correlated = [
      (numeric_cols[j], corrs[i][j])
      for j, e in enumerate(el)
      if e == 1.0 and j != i]
  if len(correlated) > 0:
      for e in correlated:
          print('{0}-to-{1}: {2:.2f}' \
          .format(numeric_cols[i], e[0], e[1]))

上面的代码会替我们计算特征变量之间的相关系数矩阵,并输出相关系数高于0.5的特征。

cor_matrix.png

根据上图输出的结果,我们又可以看出 CIG_XXX 系列的特征都有些非常高的相关性,所以在这个系列的特征之中保留一个即可。 在这里我只保留 CIG_1_TRI这个特征。同理在WEIGHT系列中我只保留MOTHER_PRE_WEIGHT这个特征。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
features_to_keep = [
'INFANT_ALIVE_AT_REPORT',
'BIRTH_PLACE',
'MOTHER_AGE_YEARS',
'FATHER_COMBINED_AGE',
'CIG_1_TRI',
'MOTHER_HEIGHT_IN',
'MOTHER_PRE_WEIGHT',
'DIABETES_PRE',
'DIABETES_GEST',
'HYP_TENS_PRE',
'HYP_TENS_GEST',
'PREV_BIRTH_PRETERM'
]
births_transformed = births_transformed.select([e for e in features_
to_keep])

小结

在这一篇实战的文章中,我讲解了:

  • 数据的载入和转换
  • 数据的描述性分析
  • 数据相关性分析

限于时间和篇幅,我打算将有关分类变量的统计检验分析,以及最后的特征选取和建模放在下一篇文章之中,欢迎大家继续阅读我的下一篇文件。

深入理解Spark 编程模型



深入理解Spark 编程模型

Spark的编程模型

Spark的应用程序主要由两部分组成:

  • Driver
  • Executor

除此之外,在Spark的编程模型的构成还包括许多其他的成分,如:SparkContext,这是Spark的应用程序的入口,负责调度各个运算资源,协调各个Worker节点上的Executor;

Dirver program则负责运行Spark应用的main()函数并创建SparkContext,通常情况下,我们用SparkContext来指代Driver program。

Executor:这是Spark应用中运行在Work Node上的一个进程,该进程负责运行Task,并且负责将数据存在内存和磁盘上,每个应用都会申请自己的Executors来负责调度和处理。

其次,在Spark编程模型中还有以下重要的概念,需要了解:

  • Application:Spark的应用程序,包含一个Driver program 和 若干个Executor
  • Cluster Manager:在集群上获取资源的外部服务
  • Work Node:集群中任何可以运行Application代码的节点,其中运行着一个或多个Executor进程。
  • Job:可以被拆分成Task的并行计算的工作单元,一般由Spark Action触发的一次执行作业
  • Stage:每个Job会被拆分成很多个Task,而每组任务就被称作Stage(相当于一个TaskSet)
  • Task:运行在Executor上的工作单元
  • RDD:弹性分布式数据集的简称,是Spark的最核心的模块和类之一

Hadoop数据集

Spark可以将任何Hadoop所支持的存储资源转化成RDD,例如:本地文件,HDFS,Cassandra,HBase等。同时,Spark不仅支持文本文件和SequenceFiles还兼容任何Hadoop InputFormat的格式。

textFile()方法

使用textFile()可以将本地文件或HDFS文件转化成RDD

读取整个文件目录

textFile(“file:///hfds/directory”)

读取文本或压缩文件(可以自动执行解压缩并加载文件数据)

textFile(“file:///hfds/directory/data.gz”)

使用通配符进行读取

textFile(“file:///hfds/data/*.csv”)

对于其他格式数据的读取有以下的方法:

  • wholeTextFiles():读取目录里的小文件,返回由(用户名,内容)结构构成的键值对
  • sequenceFileK,V:可以将SequenceFile转换成RDD
  • SparkContext.hadoopRDD:可以将其他任何Hadoop输入类型转换成RDD使用

RDD

RDD,弹性分布数据集,是Spark最核心的东西,他表示已被分区,不可变的并能够被并行操作的数据集合,不同的数据集格式对应着不同的RDD的实现。RDD的前提是其必须是可序列化的,同时RDD可以cache到内存之中

特点

  1. 只能通过转换操作(如map/filter/groupBy/join等)来从规定数据源(稳定存储的数据或其他RDD)中创建RDD
  2. 状态不可变,即不能修改
  3. 容错性强,由于RDD中的元素会根据key来分区,并保存在多个节点上,还原时只会重新计算丢失的分区的数据,不会影响整个系统的使用
  4. RDD中会保存他的继承信息,即关于它是如何从其他RDD中生成的信息
  5. 被重用的RDD会缓存在内存中,或溢出至磁盘作持久化存储
  6. Spark会延迟计算RDD,这样RDD就能够转换管道化(pipeline)
  7. 有丰富的动作(action)如:count/reduce/collect/save等支持
  8. 惰性求值,即执行了多少次transformation操作,RDD都不会真正执行运算,而只有action操作执行时,运算才会触发

RDD的元数据

每个RDD都包含了5部分的信息,他们包括数据分区的集合,能根据本地性快速访问数据的偏好位置(最佳位置),依赖关系,计算方法(函数),分区策略。

示例:

rdd_meta.png

RDD的操作

RDD中的操作主要分为两大类:

  • 转换(transformation):现有的RDD通过转换来生成一个新的RDD,转换是延迟执行(惰性求值)的。
  • 动作(actions):在RDD上执行动作后,就会运行计算,然后返回结果给驱动程序或者写入文件系统,从而触发Job。

常用transformation:

rdd_transformation.png

常用actions:

rdd_actions.png

持久化

缓存的操作
使用persistcache方法可以将任意RDD缓存在内存或磁盘文件中,缓存不仅可以加速RDD的读取速度同时兼备了容错性,可以通过构建他的transformation自动重构。

缓存是Spark最重要的一个功能,就是在不同操作间,持久化(或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(actions)变得更加迅速(通常快10倍)。所以缓存是用Spark构建迭代算法的关键。

如果你需要删除被持久化的RDD,可以用unpersistRDD()来完成该工作。

此外,每一个RDD都可以用不同的保存级别进行保存,从而允许你持久化数据集在硬盘,或者在内存作为序列化的Java对象(节省空间),甚至于跨结点复制。这些等级选择,是通过将一个org.apache.spark.storage.StorageLevel对象传递给persist()方法进行确定。

cache()方法是使用默认存储级别的快捷方法,也就是StorageLevel.MEMORY_ONLY(将反序列化的对象存入内存)。

StorageLevel有五个属性,分别是:

  • useDisk_是否使用磁盘
  • useMemory_是否使用内存
  • useOffHeap_是否使用堆外内存如:Tachyon
  • deserialized_是否进行反序列化
  • replication_备份数目。

存储级别的选择
Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:

  • 如果你的RDDs可以很好的与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。
  • 如果不行,试着使用MEMORY_ONLY_SER并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。
  • 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤
    了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。

Python在Spark上的机器学习(四)之可视化工具的介绍与PySpark的结合使用示例



Python在Spark上的机器学习(四)之可视化工具的介绍与PySpark的结合使用示例

前言

在Python和Java的生态圈中,有许多可用的可视化库,但是在这篇文章中,我们主要来介绍一下matplotlib 和 Bokeh的使用。

首先,这两个库都是Anaconda预装的。如果你是通过Anaconda来搭建的Python的科学计算环境的话,直接就可以通过import导入来使用这两个库了。

但是如果还没安装和配置好环境的朋友,可以自行参考MatplotlibBokeh的官方站点的教程来下载配置环境。

注:这一类对各系统平台支持良好的库,一般安装流程也就无非两条pip命令,如:
python -mpip install -U pip
python -mpip install -U matplotlib
pip install bokeh

conda install bokeh
所以各位读者也没有必要担心配置麻烦。

有关matplotlib和bokeh的介绍

Matplotlib

Matplotlib是一个Python 2D绘图库,可以跨平台生成各种通用格式和适用于交互式环境的高质量图表。 Matplotlib可直接用于Python脚本,IPython shell,Jupyter以及Web应用程序服务器之中。
Matplotlib简化了许多繁琐的绘图操作,使得原本简单的图表在绘制上更加简单,而复杂的图表绘制也更容易上手。只需几行代码即可生成许多好看的图表。如,直方图、功率谱、条形图、错误图,散点图等。

官方绘图预览:

enter image description here
enter image description here

enter image description here
enter image description here

Bokeh

Bokeh (Bokeh.js) 是一个 Python 交互式可视化库,支持现代化 Web 浏览器,提供非常完美的展示功能。Bokeh 的目标是使用 D3.js 样式提供优雅,简洁新颖的图形化风格,同时提供大型数据集的高性能交互功能。Boken 可以快速的创建交互式的绘图,仪表盘和数据应用。

鉴于Bokeh强调的更多是一种交互式的绘图体验,在这里我就不貼静态图了,不过下面我会附上一些官方demo的例子,让大家感受下Bokeh的强大之处。

趋势走向图

散点图

地域分布图

箱型图

结合PySpark进行可视化分析

模块加载

以下实验均在Jupyter环境下进行
matplotlib

%matplotlib inline
import matplotlib.pyplot as plt
plt.style.use('ggplot')

bokeh

import bokeh.charts as chrt
from bokeh.io import output_notebook
output_notebook()

频率分布分析

频率分布图是最为简单有效的观察数据的分布情况的方法之一。

读取数据

本文用到的数据文件依旧是上文所提及的信用欺诈检测的数据集,具体下载地址:这里

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import pyspark.sql.types as typ
fraud = sc.textFile('/home/ef/Desktop/learningPySpark-master/ccFraud.csv')
header = fraud.first()
fraud = fraud \
.filter(lambda row: row != header) \
.map(lambda row: [int(elem) for elem in row.split(',')])
fields = [
*[
typ.StructField(h[1:-1], typ.IntegerType(), True)
for h in header.split(',')
]
]
schema = typ.StructType(fields)
fraud_df = spark.createDataFrame(fraud, schema)
hists = fraud_df.select('balance').rdd.flatMap(
lambda row: row
).histogram(20)

fraud_df.printSchema()

输出:
root
|– custID: integer (nullable = true)
|– gender: integer (nullable = true)
|– state: integer (nullable = true)
|– cardholder: integer (nullable = true)
|– balance: integer (nullable = true)
|– numTrans: integer (nullable = true)
|– numIntlTrans: integer (nullable = true)
|– creditLine: integer (nullable = true)
|– fraudRisk: integer (nullable = true)

绘制频率分布直方图

matplotlib

data = {
'bins': hists[0][:-1],
'freq': hists[1]
}
plt.bar(data['bins'], data['freq'], width=2000)
plt.title('Histogram of \'balance\'')
plt.show()

输出:
mat_hist.png

bokeh

data = {
'bins': hists[0][:-1],
'freq': hists[1]
}
b_hist = chrt.Bar(
data,
values='freq', label='bins',
title='Histogram of \'balance\'')
chrt.show(b_hist)

输出:
bokeh_hist.png