Spark⼤数据分析-MLlib:线性回归实例
⽬录
现在使⽤MLlib的API实现下载房屋数据集,准备数据,拟合线性回归模型,使⽤模型预测⽰例⽬标值的过程。
分析和准备数据
从在线存储库下载房屋数据集(housing.data)(GitHub),并使⽤以下代码加载数据:
belchval housingLines = sc.textFile("first-edition/ch07/housing.data",6)
职务英文val housingVals = housingLines.map(x => Vectors.den(x.split(",").map(_.trim().toDouble)))
为housingLines RDD使⽤了6个分区,但是可以根据集群环境选择其他值。现在已将数据解析并作为Vector对象使⽤。
分析数据分布
要了解数据,可以先对它进⾏汇总,可以从相应的RowMatrix对象获取该值:
val housingMat = new RowMatrix(housingVals)
val housingStats = puteColumnSummaryStatistics()
housingStats.min
或者可以使⽤Statistics对象达到⽬的:
import org.apache.spark.mllib.stat.Statistics
val lStats(housingVals)
然后可以使⽤获取的MultivariateStatisticalSummary对象来检查矩阵每列中的平均值(mean)、最⼤值(max)和最⼩值(min),使⽤normL1和normL2⽅法获取每个列的L1范数和L2范数,使⽤variance⽅法获得每列的⽅差。
⽅差:是数据离散程度的度量,等于所有值与它们的平均值的平⽅差的均值。
标准差:是⽅差的⼆次⽅根。
协⽅差:是衡量两个变量相关的程度。
分析列余弦相似性
val housingColSims = lumnSimilarities()
//UTILITY METHOD FOR PRETTY-PRINTING MATRICES
def printMat(mat:BM[Double])={
print(" ")
睡衣女侠for(j <-ls-1)
print("%-10d".format(j));
println
for(i <-ws-1){
print("%-6d".format(i));
for(j <-ls-1)
print(" %+9.3f".format(mat(i, j)));
println
}
}
printMat(toBreezeD(housingColSims))
/* SHOULD GIVE:
0 1 2 3 4 5 6 7 8 9 10 11 12 13
0 +0,000 +0,004 +0,527 +0,052 +0,459 +0,363 +0,482 +0,169 +0,675 +0,563 +0,416 +0,288 +0,544 +0,224
1 +0,000 +0,000 +0,12
2 +0,078 +0,334 +0,467 +0,211 +0,67
3 +0,135 +0,297 +0,39
4 +0,464 +0,200 +0,528
2 +0,000 +0,000 +0,000 +0,256 +0,915 +0,824 +0,916 +0,565 +0,840 +0,931 +0,869 +0,779 +0,897 +0,693
3 +0,000 +0,000 +0,000 +0,000 +0,275 +0,271 +0,275 +0,18
4 +0,190 +0,230 +0,248 +0,266 +0,204 +0,307
4 +0,000 +0,000 +0,000 +0,000 +0,000 +0,966 +0,962 +0,780 +0,808 +0,957 +0,977 +0,929 +0,912 +0,873
5 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,909 +0,880 +0,719 +0,90
6 +0,982 +0,966 +0,832 +0,949
6 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,672 +0,801 +0,929 +0,930 +0,871 +0,918 +0,803
7 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,485 +0,710 +0,856 +0,882 +0,644 +0,856
8 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,917 +0,771 +0,642 +0,806 +0,588
9 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,939 +0,854 +0,907 +0,789
10 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,957 +0,887 +0,897
11 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,799 +0,928
12 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,670
13 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000 +0,000
housingColSims 是包含上三⾓矩阵(上三⾓矩阵包含仅在其对⾓线之上的数据)的分布式CoordinateMatrix,其第i⾏和第j列的值给出了HousingMat矩阵中第i列和第j列之间的相似度度量。hou
singColSims 矩阵中值的范围是-1到1,值为-1表⽰两列具有完全相反的⽅向,值为0表⽰它们正交,值为1表⽰它们⽅向相同。
计算协⽅差矩阵
⽤于检查输⼊集的不同列之间的相似性。可以使⽤RowMatrix对象来计算
val housingCovar = puteCovariance()
printMat(toBreezeM(housingCovar))
0 1 2 3 4 5 6 7 8 9 10 11 12 13
0 +73,987 -40,216 +23,992 -0,122 +0,420 -1,325 +85,405 -6,877 +46,848 +844,822 +5,399 -302,382 +27,986 -30,719
1 -40,216 +543,937 -85,413 -0,253 -1,396 +5,113 -373,90
2 +32,629 -63,349 -1236,454 -19,777 +373,721 -68,78
3 +77,315
2 +23,992 -85,41
3 +47,06
4 +0,110 +0,607 -1,888 +124,514 -10,228 +35,550 +833,360 +5,692 -223,580 +29,580 -30,521
3 -0,122 -0,253 +0,110 +0,065 +0,003 +0,016 +0,619 -0,053 -0,016 -1,523 -0,067 +1,131 -0,098 +0,409
4 +0,420 -1,396 +0,607 +0,003 +0,013 -0,02
5 +2,38
6 -0,188 +0,61
7 +13,046 +0,047 -4,021 +0,489 -0,455
5 -1,325 +5,113 -1,888 +0,01
6 -0,025 +0,494 -4,752 +0,304 -1,284 -34,583 -0,541 +8,215 -3,080 +4,493
6 +85,405 -373,902 +124,514 +0,619 +2,386 -4,752 +792,358 -44,329 +111,771 +2402,690 +15,93
7 -702,940 +121,07
8 -97,589
7 -6,877 +32,629 -10,228 -0,053 -0,188 +0,304 -44,329 +4,434 -9,068 -189,665 -1,060 +56,040 -7,473 +4,840
8 +46,848 -63,349 +35,550 -0,016 +0,617 -1,284 +111,771 -9,068 +75,816 +1335,757 +8,761 -353,276 +30,385 -30,561
9 +844,822 -1236,454 +833,360 -1,523 +13,046 -34,583 +2402,690 -189,665 +1335,757 +28404,759 +168,153 -6797,911 +654,715 -726,256
10 +5,399 -19,777 +5,692 -0,067 +0,047 -0,541 +15,937 -1,060 +8,761 +168,153 +4,687 -35,060 +5,783 -10,111
11 -302,382 +373,721 -223,580 +1,131 -4,021 +8,215 -702,940 +56,040 -353,276 -6797,911 -35,060 +8334,752 -238,668 +279,990
12 +27,986 -68,783 +29,580 -0,098 +0,489 -3,080 +121,078 -7,473 +30,385 +654,715 +5,783 -238,668 +50,995 -48,448
13 -30,719 +77,315 -30,521 +0,409 -0,455 +4,493 -97,589 +4,840 -30,561 -726,256 -10,111 +279,990 -48,448 +84,587
⽅差-协⽅差矩阵的对⾓线上的值是每列的⽅差,其他位置上的值代表两个匹配列的协⽅差。如果两列的协⽅差为0,则它们不存在线性关系。若为负值,为负相关,正值则为正相关。
转换为LabeledPoint
检查完数据之后,可以对数据进⾏线性回归了,⾸先需要将数据集中的每⼀个⽰例放在⼀个称为LabeledPoint的结构中,这个结构在Spark 的机器学习中⽐较常⽤,它包含标签Label和特征Vector。创建LabeledPoint时需要将特征和标签分开
import org.apache.ssion.LabeledPoint
val housingData = housingVals.map(x =>{
val a = x.toArray;
LabeledPoint(a(a.length-1), Vectors.den(a.slice(0, a.length-1)))
})
拆分数据
接着将数据拆分成训练集和验证集
val ts = housingData.randomSplit(Array(0.8,0.2))
val housingTrain =ts(0)
val housingValid =ts(1)
特征缩放和均值归⼀化
检查数据的分布时,列之间的数据跨度有很⼤的差异,⽐如第⼀列中的数据0.00632 ~ 88.9762,第五列的数据0.385 ~ 0.871,这样的数据可能会让损失函数在向最低点收敛时遇到困难,或者是其结果会被某⼀维度的特征异常地影响。
特征缩放意味着将数据范围缩放到可⽐较的⼤⼩。归⼀化意味着数据平均值⼤致为0。做法是⾸先⽤
数据来创建⼀个缩放器。
import org.apache.spark.mllib.feature.StandardScaler
val scaler = new StandardScaler(true,true).fit(housingTrain.map(x => x.features))
然后将这个缩放器应⽤到数据上。
val trainScaled = housingTrain.map(x =>LabeledPoint(x.label, ansform(x.features)))
val validScaled = housingValid.map(x =>LabeledPoint(x.label, ansform(x.features)))
拟合和使⽤线性回归模型
Spark中的线性回归模型由org.apache.ssion包中的LinearRegressionModel类实现。当使⽤数据拟合了⼀个模型对象后,可以使⽤其对各个Vector⽰例进⾏预测,⽅法是predict。
import org.apache.ssion.LinearRegressionWithSGD
val alg = new LinearRegressionWithSGD()
alg.tIntercept(true)
alg.optimizer.tNumIterations(200)
trainScaled.cache()
blunt的反义词
validScaled.cache()
val model = alg.run(trainScaled)
预测⽬标值
模型训练完毕后,就可以使⽤它来预测验证集中的数据,验证集的数据是有label的,所以预测出来的label可以与已有的label⼀起使⽤,进⾏⽐较。
val validPredicts = validScaled.map(x =>(model.predict(x.features), x.label))
通过检查可以发现,⼀些预测与原始标签很近,有的则差很多。为了量化模型的有效性,可以计算均⽅根误差。
val RMSE = math.sqrt(validPredicts.map{ca(p,l)=> math.pow(p-l,2)}.mean())
评估模型的性能
Spark提供了RegressionMetrics类来对回归模型的性能进⾏评估,它返回⼏个有⽤的评估指标。
import org.apache.spark.mllib.evaluation.RegressionMetrics
val validMetrics = new RegressionMetrics(validPredicts)
除了上⾯这些,还有meanAbsoluteError,r2,explaineddVariance。
解释模型参数
cat power模型训练之后的权重集可以告诉我们单个维度对⽬标变量的影响。如果特定的权重接近于0,则相应的维度对label的影响就不会很显著。(这要在数据已经进⾏过缩放的前提上)
println(Array.map(x => x.abs).zipWithIndex.sortBy(_._1).mkString(", "))
加载和保存模型
Spark提供了⼀种将模型保存到⽂件系统的⽅法(Parquet),并且在有需要的时候加载它。
model.save(sc,"hdfs:///path/to/saved/model")
import org.apache.ssion.LinearRegressionModel
val model = LinearRegressionModel.load(sc,"hdfs:///path/to/saved/model")
调整算法
公式中的参数r是步长参数,有助于稳定梯度下降算法。此外还有迭代次数,如果迭代次数太⼤,则模型拟合会花费太多时间,如果它太⼩,则算法可能达不到最⼩值。
找到正确的步长和迭代次数
找到这两个参数的⽅法是多长尝试⼏个组合,并找到最好的结果。
import org.apache.spark.rdd.RDD
def iterateLRwSGD(iterNums:Array[Int], stepSizes:Array[Double], train:RDD[LabeledPoint], test:RDD[LabeledPoint])={
for(numIter <- iterNums; step <- stepSizes)
{
val alg = new LinearRegressionWithSGD()
alg.tIntercept(true).optimizer.tNumIterations(numIter).tStepSize(step)
val model = alg.run(train)
val rescaledPredicts = train.map(x =>(model.predict(x.features), x.label))
val validPredicts = test.map(x =>(model.predict(x.features), x.label))
val meanSquared = math.sqrt(rescaledPredicts.map({ca(p,l)=> math.pow(p-l,2)}).mean())
val meanSquaredValid = math.sqrt(validPredicts.map({ca(p,l)=> math.pow(p-l,2)}).mean())
println("%d, %5.3f -> %.4f, %.4f".format(numIter, step, meanSquared, meanSquaredValid))
//Uncomment if you wish to e weghts and intercept values:
/
/println("%d, %4.2f -> %.4f, %.4f (%s, %f)".format(numIter, step, meanSquared, meanSquaredValid, model.weights, model.intercept))
}
}
iterateLRwSGD(Array(200,400,600),Array(0.05,0.1,0.5,1,1.5,2,3), trainScaled, validScaled)
上⾯的代码返回训练和验证集的RMSE:
// Our results:
// 200, 0.050 -> 7.5420, 7.4786
// 200, 0.100 -> 5.0437, 5.0910
// 200, 0.500 -> 4.6920, 4.7814
// 200, 1.000 -> 4.6777, 4.7756
// 200, 1.500 -> 4.6751, 4.7761
// 200, 2.000 -> 4.6746, 4.7771
// 200, 3.000 -> 108738480856.3940, 122956877593.1419
// 400, 0.050 -> 5.8161, 5.8254
// 400, 0.100 -> 4.8069, 4.8689
// 400, 0.500 -> 4.6826, 4.7772
// 400, 1.000 -> 4.6753, 4.7760
// 400, 1.500 -> 4.6746, 4.7774
// 400, 2.000 -> 4.6745, 4.7780
// 400, 3.000 -> 25240554554.3096, 30621674955.1730
// 600, 0.050 -> 5.2510, 5.2877
// 600, 0.100 -> 4.7667, 4.8332
/
/ 600, 0.500 -> 4.6792, 4.7759
// 600, 1.000 -> 4.6748, 4.7767
// 600, 1.500 -> 4.6745, 4.7779
// 600, 2.000 -> 4.6745, 4.7783
// 600, 3.000 -> 4977766834.6285, 6036973314.0450
通过上⾯的结果可以得到步长为1,使⽤200次会更适合。
添加⾼阶多项式
通常情况下,数据不遵循简单的线性公式,可能是某种曲线,曲线通常可以使⽤⾼阶多项式来描述。
Spark不提供训练⾮线性回归模型的⽅法,但是可以通过将现有特征相乘所获得的附加特征来扩展数据集:
def addHighPols(v:Vector): Vector =
edition
{
Vectors.Array.flatMap(x =>Array(x, x*x)))
}
val housingHP = housingData.map(v =>LabeledPoint(v.label,addHighPols(v.features)))
查看housingHP RDD,扩展了附加特征,总共是26个特征
低碳英语作文
housingHP.first().features.size
acpi接着拆分数据集,缩放数据:
val tsHP = housingHP.randomSplit(Array(0.8,0.2))
val housingHPTrain =tsHP(0)
val housingHPValid =tsHP(1)
val scalerHP = new StandardScaler(true,true).fit(housingHPTrain.map(x => x.features))
val trainHPScaled = housingHPTrain.map(x =>LabeledPoint(x.label, ansform(x.features)))
val validHPScaled = housingHPValid.map(x =>LabeledPoint(x.label, ansform(x.features)))
trainHPScaled.cache()
validHPScaled.cache()
估计最佳步长和迭代次数:
iterateLRwSGD(Array(200,400),Array(0.4,0.5,0.6,0.7,0.9,1.0,1.1,1.2,1.3,1.5), trainHPScaled, validHPScaled)
// Our results:
// 200, 0.400 -> 4.5423, 4.2002
// 200, 0.500 -> 4.4632, 4.1532
// 200, 0.600 -> 4.3946, 4.1150
// 200, 0.700 -> 4.3349, 4.0841
// 200, 0.900 -> 4.2366, 4.0392
// 200, 1.000 -> 4.1961, 4.0233
// 200, 1.100 -> 4.1605, 4.0108
// 200, 1.200 -> 4.1843, 4.0157
// 200, 1.300 -> 165.8268, 186.6295
// 200, 1.500 -> 182020974.1549, 186781045.5643
// 400, 0.400 -> 4.4117, 4.1243
// 400, 0.500 -> 4.3254, 4.0795
// 400, 0.600 -> 4.2540, 4.0466
// 400, 0.700 -> 4.1947, 4.0228
// 400, 0.900 -> 4.1032, 3.9947lawless
// 400, 1.000 -> 4.0678, 3.9876
/
/ 400, 1.100 -> 4.0378, 3.9836
// 400, 1.200 -> 4.0407, 3.9863
// 400, 1.300 -> 106.0047, 121.4576
// 400, 1.500 -> 162153976.4283, 163000519.6179
从结果可以看出,RMSE在1.3的位置会出现暴涨,在1.1的位置获得最佳结果,在400次迭代中获得最佳RMSE为3.9836。当使⽤更⼤的迭代次数时:
iterateLRwSGD(Array(200,400,800,1000,3000,6000),Array(1.1), trainHPScaled, validHPScaled)