机器学习及flinkML算法学习
机器学习及flinkML算法
机器学习概念
机器学习算法根据训练数据(training data)使得表⽰算法⾏为的数学⽬标最⼤化,并以此来进⾏预测或者做出决定。机器学习分为分类、回归、聚类等,每种都有不⼀样的⽬标。
应⽤场景和处理流程
所有的算法都需要定义每个数据点的特征(feature)集->输⼊;
正确的定义特征才是机器学习中最有挑战的部分。
⼤多数算法都是专为数据特征(就是⼀个代表各个特征值的数字向量)定义的,因此提取特征并转化为特征向量是机器学习过程中重要的⼀步。
输⼊数据分为“训练集”和“测试集”,并且只使⽤前者进⾏训练,这样就可以⽤后者来检验模型是否过度拟合了训练数据。
机器学习流⽔线会训练出多个不同版本的模型,然后分别对其进⾏评估。Ml提供⼏个算法进⾏模型评估。
分类算法
基于已经被标注的其他数据点作为例⼦来识别⼀个数据点属于⼏个类别中的哪⼀种;⽐如判断⼀封邮件是否为垃圾邮件。
垃圾邮件分类做法:
1. HashingTF
⽂本数据构建词频特征向量
2. LogisticRegressionWithSGD
使⽤随机梯度下降法实现逻辑回归。
监督学习
SVM使⽤通信⾼效的分布式双坐标上升(CoCoA)
多元线性回归
优化框架
L-BFGS
Generalized Linear Models
Multiple linear regression
LASSO, Ridge regression
Multi-class Logistic regression
Random forests
Support Vector Machines
Decision trees
⽆监督学习
k-最近邻居关联
Unsupervid learning
Clustering
K-means clustering
Principal Components Analysis
Recommendation
ALS
Text analytics
LDA
数据预处理
多项式特征
标准尺度
MinMax Scaler
降维处理
模型选择和性能评估
多种评分功能模型评估
模型选择和评估的交叉验证
flinkML⽬前⽀持的算法
监督学习
马桶水箱维修基于通信效率的SVM分布式双坐标提升(CoCoA)
多元线性回归
优化框架
⾮监督学习
kNN(K最邻近算法)算法
数据预处理
多项式特征意识近义词
标准定标器
极⼤极⼩标量
推荐算法
交替最⼩⼆乘法(ALS)
离群值选择
随机离群点选择
公⽤事业公司
距离度量
交叉验证
分类代码开始
添加FlinkML依赖到 l.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-ml_2.11</artifactId>
<version>1.8.0</version>
</dependency>
1. 导⼊数据
格式化数据 ⽐如 LibSVN ,监督学习问题⼀般使⽤LabeledVector类来表⽰(Label,Features)
什么食物去湿气效果好
下载UCI ML仓库数据集,该数据集“包含了⼀项关于乳腺癌⼿术患者存活率的研究的案例”,数据格式如下所⽰:
30,64,1,1
30,62,3,1
30,65,0,2
婴儿睡眠时间表31,59,2,1
前三列代表特征,最后⼀列代表分类,the 4th column indicates whether the patient survived 5 years or longer (label 1), or died within 5 years (label 2)
import org.apache.flink.api.scala._
val env = ExecutionEnvironment
奖励性绩效val survival = adCsvFile[(String, String, String, String)]("/path/to/haberman.data")
将数据转换为DataSet[LabeledVector],⽤FlinkML分类算法,第四列是分类标签,构建LabeledVector
元素如下
import org.apache.LabeledVector
import org.apache.flink.ml.math.DenVector
val survivalLV = survival
.map{tuple =>
val list = List
val numList = list.map(_.asInstanceOf[String].toDouble)
LabeledVector(numList(3), DenVector(numList.take(3).toArray))
}
prade然后我们可以⽤这些数据来训练学习者。不过,我们将使⽤另⼀个数据集来举例说明如何构建学习者;如何导⼊其他数据集格式,举例如下:LibSVM ⽂件
ML数据集的⼀种常见格式是LibSVM格式。FlinkML通过MLUtils对象提供的readLibSVM函数,提供了使⽤LibSVM格式加载数据集的实⽤程序。您还可以使⽤writeLibSVM函数以LibSVM格式保存数据集。
让我们导⼊svmguide1数据集。你可以在这⾥下载训练集和测试集。这是⼀个天⽂粒⼦⼆分类数据集,由Hsu等⼈在他们的实⽤⽀持向量机(SVM)指南中使⽤。它包含4个数值特征,以及类标签。
导⼊数据集合如下:
import org.apache.flink.ml.MLUtils
val astroTrainLibSVM: DataSet[LabeledVector] = adLibSVM(env, "/path/to/svmguide1")
val astroTestLibSVM: DataSet[LabeledVector] = adLibSVM(env, "/path/to/svmguide1.t")
2. 数据分类
导⼊训练和测试数据集后,为分类做好准备。由于Flink SVM只⽀持阈值为+1.0和-1.0的⼆进制值,加载LibSVM数据集后需要进⾏转换,因为它使⽤了1和0进⾏标记。
转换可以⽤⼀个简单的归⼀化映射函数:
import org.apache.flink.ml.math.Vector
def normalizer : LabeledVector => LabeledVector = {
lv => LabeledVector(if (lv.label > 0.0) 1.0 el -1.0, lv.vector)
}
val astroTrain: DataSet[LabeledVector] = astroTrainLibSVM.map(normalizer)住宿申请书
val astroTest: DataSet[(Vector, Double)] = astroTestLibSVM.map(normalizer).map(x => (x.vector, x.label))
转换了数据集后,开始训练⼀个预测器,如线性SVM分类器。我们可以为分类器设置⼀些参数。在这⾥,我们设置Blocks参数,它被使⽤的底层CoCoA算法⽤于分割输⼊。正则化参数决定了l正则化的应⽤数量,⽤于避免过拟合。步骤⼤⼩决定权重向量更新到下⼀个权重向量
2
值的贡献。此参数设置初始步长。
import org.apache.flink.ml.classification.SVM
val svm = SVM()
.Parallelism)
.tIterations(100)
.tRegularization(0.001)
.tStepsize(0.1)
.tSeed(42)
svm.fit(astroTrain)
现在我们可以对测试集进⾏预测,并使⽤evaluate函数创建(真值、预测)对。
val evaluationPairs: DataSet[(Double, Double)] = svm.evaluate(astroTest)
接下来,我们将了解如何预处理数据,并使⽤FlinkML的ML管道功能。
3. 数据预处理和管道(pipelines)
当使⽤SVM分类时,通常增益的预处理步骤是将输⼊特征缩放到[0,1]范围,以避免极值特征占主导地位。FlinkML有许多转换器(如⽤于预处理数据的MinMaxScaler),其中⼀个关键特性是能够将转换器和预测器链接在⼀起。这允许我们运⾏相同的转换管道,并以⼀种直接且类型安全的⽅式对⽕车和测试数据进⾏预测。您可以在pipeline⽂档中阅读更多关于FlinkML管道系统的信息。
⾸先为数据集中的特征创建⼀个归⼀化转换器,并将其链接到⼀个新的SVM分类器。
import org.apache.flink.ml.preprocessing.MinMaxScaler
val scaler = MinMaxScaler()
val scaledSVM = scaler.chainPredictor(svm)
现在,我们可以使⽤新创建的管道对测试集进⾏预测。
⾸先,我们再次调⽤fit,来训练定标器和SVM分类器。然后将测试集的数据⾃动缩放,然后将其传递给SVM进⾏预测。
scaledSVM.fit(astroTrain)
val evaluationPairsScaled: DataSet[(Double, Double)] = scaledSVM.evaluate(astroTest)
按⽐例的输⼊应该能给我们更好的预测性能。
FlinkML算法⼆-交替最⼩⼆乘法(Alternating Least Squares)
交替最⼩⼆乘法(ALS)算法将⼀个给定的R矩阵因式分解为 U 和 V 两个因⼦,例如 R≈U V。 未知的⾏的维度被⽤作算法的参数,叫做潜在因⼦。 由于矩阵因式分解可以⽤在推荐系统的场景,U和V矩阵可以分别称为⽤户和商品矩阵。 ⽤户矩阵的第i列⽤u表⽰,商品矩阵的第i列⽤v表⽰。 R 矩阵称为评价矩阵可以⽤(R)=r 表⽰。 为了找到⽤户和商品矩阵,如下问题得到了解决:
argminU,V∑{i,j∣ri,j≠0}(ri,j−uTivj)2+λ(∑inui∥ui∥2+∑jnvj∥vj∥2)
λ作为正则化因⼦,n作为⽤户i评过分的商品数量, n作为商品j被评分的次数。 这个因式分解⽅案避免了称作加权λ因式分解的过拟合。
通过修复U 和 V矩阵,我们获得可以直接解析的⼆次形式。 问题的解决办法是保证总消耗函数的单调递减。通过对U 或 V矩阵的这⼀步操作,我们逐步的改进了矩阵的因式分解。
R 矩阵作为 (i,j,r) 元组的疏松表⽰。i 为⾏索引,j 为列索引,r 为 (i,j) 位置上的矩阵值。
操作
ALS 是⼀个预测模型(Predictor)。 因此,它⽀持拟合(fit)与预测(predict)两种操作。
五年级上册数学书答案拟合
ALS⽤于评价矩阵的疏松表⽰过程的训练:
fit: DataSet[(Int, Int, Double)] => Unit
预测
ALS会对每个元组⾏列的所有索引进⾏评分预测:
predict: DataSet[(Int, Int)] => DataSet[(Int, Int, Double)]
参数
ALS的实现可以通过下⾯的参数进⾏控制:
参数描述
NumFactors底层模型中使⽤的潜在因⼦数⽬。等价于计算⽤户和商品向量的维度。 (默认值:10)
Lambda因式分解的因⼦。 该值⽤于避免过拟合或者由于强⽣成导致的低性能。 (默认值:1)
Iterations最⼤迭代次数。 (默认值:10)
Blocks 设定⽤户和商品矩阵被分组后的块数量。块越少,发送的冗余数据越少。然⽽,块越⼤意味着堆中需要存储的更新消息越⼤。如果由于OOM导致算法失败,试着降低块的数量。 (默认值:None)
Seed⽤于算法⽣成初始矩阵的随机种⼦。 (默认值:0)
TemporaryPath 导致结果被⽴即存储到临时⽬录的路径。如果该值被设定,算法会被分为两个预处理阶段,ALS迭代和计算最后ALS半阶段的处理中阶段。预处理阶段计算给定评分矩阵的OutBlockInformation和InBlockInformation。每步的结果存储在特定的⽬录。通过将算法分为更多⼩的步骤,Flink不会在多个算⼦中分割可⽤的内存。这让系统可以处理更⼤的独⽴消息并提升总性能。 (默认值:None)
例⼦
T
i i i,j i,j
argmin
u i v j