SparkML源码分析

[TOC]

RDD Basic

Transformation

rdd的操作分为transform和action

transform不会触发任务执行,action会

Transformation Meaning
map(func) Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func) Return a new dataset formed by selecting those elements of the source on which funcreturns true.
flatMap(func) Similar to map, but each input item can be mapped to 0 or more output items (so funcshould return a Seq rather than a single item).
mapPartitions(func) Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.
mapPartitionsWithIndex(func) Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator<T>) => Iterator<U> when running on an RDD of type T.
sample(withReplacement, fraction, seed) Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset) Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset) Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numPartitions])) Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numPartitions argument to set a different number of tasks.
reduceByKey(func, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numPartitions]) When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numPartitions]) When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numPartitions]) When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This operation is also called groupWith.
cartesian(otherDataset) When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars]) Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process's stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions) Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions) Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner) Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Action

Action Meaning
reduce(func) Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect() Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count() Return the number of elements in the dataset.
first() Return the first element of the dataset (similar to take(1)).
take(n) Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed]) Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering]) Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path) Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala) Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that implement Hadoop's Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) (Java and Scala) Write the elements of the dataset in a simple format using Java serialization, which can then be loaded usingSparkContext.objectFile().
countByKey() Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func) Run a function func on each element of the dataset. This is usually done for side effects such as updating an Accumulator or interacting with external storage systems. Note: modifying variables other than Accumulators outside of the foreach() may result in undefined behavior. See Understanding closures for more details.

shuffle

shuffle就是洗牌的意思,意味着重新分配数据,我们看官方的解释:

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

从reduceByKey来理解shuffle

thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key - this is called the shuffle.

什么操作会触发shuffle

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

shuffle操作非常昂贵:

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O.

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

shuffle当内存不够时会使用磁盘

Shuffle also generates a large number of intermediate files on disk. As of Spark 1.3, these files are preserved until the corresponding RDDs are no longer used and are garbage collected. This is done so the shuffle files don’t need to be re-created if the lineage is re-computed. Garbage collection may happen only after a long period of time, if the application retains references to these RDDs or if GC does not kick in frequently. This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by thespark.local.dir configuration parameter when configuring the Spark context.

broadcast

Broadcast variables allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.

Accumulators

Accumulators are variables that are only “added” to through an associative and commutative operation and can therefore be efficiently supported in parallel.

Spark Streaming

先看官方简介

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams.

Data can be ingested from many sources like Kafka, Flume, Kinesis, or TCP sockets, and can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards.

其实spark streaming可以看成是间隔很小的批处理

虽然spark streaming是近似实时,但是throughput更大

Latex

$$ Ent(D) = - \sum_{i=1}^{|Y|} p_{k}\log_{2}p_{k}$$

决策树

如何划分属性

在决策树算法中,如何选择最优划分属性是最关键的一步。一般而言,随着划分过程的不断进行,我们希望决策树的分支节点所包含的样本尽可能属于同一类别,即节点的“纯度(purity)”越来越高。 有几种度量样本集合纯度的指标。在MLlib中,信息熵和基尼指数用于决策树分类,方差用于决策树回归。

信息熵

信息熵是度量样本集合纯度最常用的一种指标,假设当前样本集合D中第k类样本所占的比例为p_k,则D的信息熵定义为:

$$ Ent(D) = - \sum_{i=1}^{|Y|} p_{k}\log_{2}p_{k}$$

Ent(D)的值越小,则D的纯度越高

Gini基尼系数

$$ Gini(D) = 1- \sum_{i=1}^{|Y|} p_{k}^2$$

直观来说,Gini(D)反映了从数据集D中随机取样两个样本,其类别标记不一致的概率。因此,Gini(D)越小,则数据集D的纯度越高。

方差

$$ Var(D) = 1/N \sum_{i=1}^{N} (y_{i} - 1/N\sum_{i=1}^{N}y_{i})$$

这个也是sparkMLlib使用的方式

##信息增益

假设切分大小为N的数据集D为两个数据集D_leftD_right,那么信息增益可以表示为如下的形式

$$IG(D,s) = impurity(D) - \frac{N_{left}}{N} impurity(D) -\frac{N_{right}}{N} impurity(D) $$

imformation gain

一般情况下,信息增益越大,则意味着使用属性a来进行划分所获得的纯度提升越大。因此我们可以用信息增益来进行决策树的划分属性选择。

决策树的缺点:

1.对那些各类别数据量不一致的数据,在决策树种,

信息增益的结果偏向那些具有更多数值的特征

2.容易过拟合

3.忽略了数据集中属性之间的相关性

随机森林

1 bagging

Bagging采用自助采样法(bootstrap sampling)采样数据。给定包含m个样本的数据集,我们先随机取出一个样本放入采样集中,再把该样本放回初始数据集,使得下次采样时,样本仍可能被选中, 这样,经过m次随机采样操作,我们得到包含m个样本的采样集。

按照此方式,我们可以采样出T个含m个训练样本的采样集,然后基于每个采样集训练出一个基本学习器,再将这些基本学习器进行结合。这就是Bagging的一般流程。在对预测输出进行结合时,Bagging通常使用简单投票法, 对回归问题使用简单平均法。若分类预测时,出现两个类收到同样票数的情形,则最简单的做法是随机选择一个,也可以进一步考察学习器投票的置信度来确定最终胜者。

Bagging的算法描述如下图所示。

输入: 训练集 D={(x1,y1),...} 基础学习算法 $$ B$$ 训练次数T 过程:

  1. for t=1,2,...., T do

  2. $$h_{t} = B(D, D_{bs})$$

  3. End for

    输出:$$H_{x} = argmax {y}\in{Y} \sum_{t=1}^{T}I(h_{t}(x) = y)$$

2 随机森林

随机森林是Bagging的一个扩展变体。随机森林在以决策树为基学习器构建Bagging集成的基础上,进一步在决策树的训练过程中引入了随机属性选择。具体来讲,传统决策树在选择划分属性时, 在当前节点的属性集合(假设有d个属性)中选择一个最优属性;而在随机森林中,对基决策树的每个节点,先从该节点的属性集合中随机选择一个包含k个属性的子集,然后再从这个子集中选择一个最优属性用于划分。 这里的参数k控制了随机性的引入程度。若令k=d,则基决策树的构建与传统决策树相同;若令k=1,则是随机选择一个属性用于划分。在MLlib中,有两种选择用于分类,即k=log2(d)k=sqrt(d); 一种选择用于回归,即k=1/3d

可以看出,随机森林对Bagging只做了小改动,但是与Bagging中基学习器的“多样性”仅仅通过样本扰动(通过对初始训练集采样)而来不同,随机森林中基学习器的多样性不仅来自样本扰动,还来自属性扰动。 这使得最终集成的泛化性能可通过个体学习器之间差异度的增加而进一步提升。

3 随机森林在分布式环境下的优化策略

  • 切分点抽样统计
  • 特征装箱(Binning
  • 逐层训练(level-wise training

梯度提升树

1 boosting

Boosting是一类将弱学习器提升为强学习器的算法。这类算法的工作机制类似:先从初始训练集中训练出一个基学习器,再根据基学习器的表现对训练样本分布进行调整,使得先前基学习器做错的训练样本在后续受到更多关注。 然后基于调整后的样本分布来训练下一个基学习器;如此重复进行,直至基学习器的数目达到事先指定的值T,最终将这T个基学习器进行加权结合。

Boost算法是在算法开始时,为每一个样本赋上一个相等的权重值,也就是说,最开始的时候,大家都是一样重要的。 在每一次训练中得到的模型,会使得数据点的估计有所差异,所以在每一步结束后,我们需要对权重值进行处理,而处理的方式就是通过增加错分点的权重,这样使得某些点如果老是被分错,那么就会被“严重关注”,也就被赋上一个很高的权重。 然后等进行了N次迭代,将会得到N个简单的基分类器(basic learner),最后将它们组合起来,可以对它们进行加权(错误率越大的基分类器权重值越小,错误率越小的基分类器权重值越大)、或者让它们进行投票等得到一个最终的模型。

梯度提升(gradient boosting)属于Boost算法的一种,也可以说是Boost算法的一种改进,它与传统的Boost有着很大的区别,它的每一次计算都是为了减少上一次的残差(residual),而为了减少这些残差,可以在残差减少的梯度(Gradient)方向上建立一个新模型。所以说,在Gradient Boost中,每个新模型的建立是为了使得先前模型残差往梯度方向减少, 与传统的Boost算法对正确、错误的样本进行加权有着极大的区别。

梯度提升算法的核心在于,每棵树是从先前所有树的残差中来学习。利用的是当前模型中损失函数的负梯度值作为提升树算法中的残差的近似值,进而拟合一棵回归(分类)树。

##2 梯度提升

gbdt2

3 随机梯度提升

有文献证明,注入随机性到上述的过程中可以提高函数估计的性能。受到Breiman的影响,将随机性作为一个考虑的因素。在每次迭代中,随机的在训练集中抽取一个子样本集,然后在后续的操作中用这个子样本集代替全体样本。

线性模型

1 数学描述

许多标准的机器学习算法可以归结为凸优化问题。例如,找到凸函数f的一个极小值的任务,这个凸函数依赖于可变向量w(在spark源码中,一般表示为weights)。 形式上,我们可以将其当作一个凸优化问题${min}_{w}f(w)$。它的目标函数可以表示为如下公式 (1)

​ $$f(W) = \lambda R(W) + \frac{1}{n} \sum_{i=1}^{n}L(w; x_{i}, y{i})$$

在上式中,向量x表示训练数据集,y表示它相应的标签,也是我们想预测的值。如果L(w;x,y)可以表示为${w}^{T}x​$和y的函数, 我们称这个方法为线性的。spark.mllib中的几种分类算法和回归算法可以归为这一类。

目标函数f包含两部分:正则化(regularizer),用于控制模型的复杂度;损失函数,用于度量模型的误差。损失函数L(w;.)是一个典型的基于w的凸函数。固定的正则化参数gamma定义了两种目标的权衡(trade-off), 这两个目标分别是最小化损失(训练误差)以及最小化模型复杂度(为了避免过拟合)。

Spark ML整体架构

1
2
3
4
5
6
7
8
9
10
11
MLlib standardizes APIs for machine learning algorithms to make it easier to combine multiple algorithms into a single pipeline, or workflow. This section covers the key concepts introduced by the Pipelines API, where the pipeline concept is mostly inspired by the scikit-learn project.

DataFrame: This ML API uses DataFrame from Spark SQL as an ML dataset, which can hold a variety of data types. E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.

Transformer: A Transformer is an algorithm which can transform one DataFrame into another DataFrame. E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with predictions.

Estimator: An Estimator is an algorithm which can be fit on a DataFrame to produce a Transformer. E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.

Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

Parameter: All Transformers and Estimators now share a common API for specifying parameters.

sparkML基本上是按照上述介绍搭起来的,最基本的是PipelineStage 是Pipeline中的一步。

主要是estimator 和 transformer.

数据类型是dataFrame

Estimator

estimator就一个method 就是fit

<pre class="mermaid">graph LR; ProbabilisticClassifier --> Classifier; Classifier --> Predictor; Predictor --> Estimator;</pre>

ProbabilisticClassifier

1
2
3
class DecisionTreeClassifier @Since("1.4.0") (
@Since("1.4.0") override val uid: String)
extends ProbabilisticClassifier[Vector, DecisionTreeClassifier, DecisionTreeClassificationModel]

Transformer

transformer就是把一个df转成另一个df

Logistic Regression

spark的LR支持二分类和多分类,默认是二分类

看看LR的train都干了什么

首先LR支持样本有权重,只要df里有weight就行

1
2
3
4
5
6
7
val instances: RDD[Instance] =
dataset.select(col($(labelCol)), w, col($(featuresCol))).rdd.map {
case Row(label: Double, weight: Double, features: Vector) =>
Instance(label, weight, features)
}

if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)

这一步是把df里的label, weight, features都挑出来,如果没有weight 就默认是1

instances.persist 是确定数据的储存级别,默认就是MEMORY_AND_DISK

我们常用的rdd.cache() 本质上是 persist(StorageLevel.MEMORY_ONLY) 即把rdd的数据加载到内存

关于spark对内存的管理,可以看

https://www.ibm.com/developerworks/cn/analytics/library/ba-cn-apache-spark-memory-management/index.html

https://www.jianshu.com/u/5b15278387a0

中间用的是Breeze的LBFGS优化器

1
new BreezeOWLQN[Int, BDV[Double]]($(maxIter), 10, regParamL1Fun, $(tol))

最后组装model

1
2
val model = copyValues(new LogisticRegressionModel(uid, coefficientMatrix, interceptVector,
numClasses, isMultinomial))

TreeAggregate

treeAggregate也是spark rdd一个非常重要的操作,是数据聚合操作

可以看这篇

https://blog.csdn.net/u011724402/article/details/79057450

1
与aggregate不同的是treeAggregate多了depth的参数,其他参数含义相同。aggregate在执行完SeqOp后会将计算结果拿到driver端使用CombOp遍历一次SeqOp计算的结果,最终得到聚合结果。而treeAggregate不会一次就Comb得到最终结果,SeqOp得到的结果也许很大,直接拉到driver可能会OutOfMemory,因此它会先把分区的结果做局部聚合(reduceByKey),如果分区数过多时会做分区合并,之后再把结果拿到driver端做reduce

GradientDescent

所有在spark上实现并行的ML的算法都可以参考梯度下降的做法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
while (!converged && i <= numIterations) {
// 把权重分发给executor, 每轮迭代都要发一次!?这就是spark的瓶颈
val bcWeights = data.context.broadcast(weights)
// Sample a subset (fraction miniBatchFraction) of the total data
// compute and sum up the subgradients on this subset (this is one map-reduce)


val (gradientSum, lossSum, miniBatchSize) = data.sample(false, miniBatchFraction, 42 + i)
.treeAggregate((BDV.zeros[Double](n), 0.0, 0L))(
seqOp = (c, v) => {
// c: (grad, loss, count), v: (label, features)
val l = gradient.compute(v._2, v._1, bcWeights.value, Vectors.fromBreeze(c._1))
(c._1, c._2 + l, c._3 + 1)
},
combOp = (c1, c2) => {
// c: (grad, loss, count)
(c1._1 += c2._1, c1._2 + c2._2, c1._3 + c2._3)
})
bcWeights.destroy(blocking = false)

if (miniBatchSize > 0) {
/**
* lossSum is computed using the weights from the previous iteration
* and regVal is the regularization value computed in the previous iteration as well.
*/
stochasticLossHistory += lossSum / miniBatchSize + regVal
val update = updater.compute(
weights, Vectors.fromBreeze(gradientSum / miniBatchSize.toDouble),
stepSize, i, regParam)
weights = update._1
regVal = update._2

previousWeights = currentWeights
currentWeights = Some(weights)
if (previousWeights != None && currentWeights != None) {
converged = isConverged(previousWeights.get,
currentWeights.get, convergenceTol)
}
} else {
logWarning(s"Iteration ($i/$numIterations). The size of sampled batch is zero")
}
i += 1
}

RDD sampling

RDD提供了sampling的接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* Return a sampled subset of this RDD.
*
* @param withReplacement can elements be sampled multiple times (replaced when sampled out)
* @param fraction expected size of the sample as a fraction of this RDD's size
* without replacement: probability that each element is chosen; fraction must be [0, 1]
* with replacement: expected number of times each element is chosen; fraction must be greater
* than or equal to 0
* @param seed seed for the random number generator
*
* @note This is NOT guaranteed to provide exactly the fraction of the count
* of the given [[RDD]].
*/
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T] = {
require(fraction >= 0,
s"Fraction must be nonnegative, but got ${fraction}")

withScope {
require(fraction >= 0.0, "Negative fraction value: " + fraction)
if (withReplacement) {
new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed)
} else {
new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed)
}
}
}