数据基础---spark中的数据类型
mllib中的数据类型
本⽂是对的翻译整理
1、数据类型
Local vector(本地向量)
Labeled point(带标签数据点)
Local matrix(本地矩阵)
Distrubuted matrix(分布式矩阵):RowMatrix、IndexedRowMatrix、CoordinateMatrix、BlockMatrix
MLlib⽀持存储在单个机器上的本地的向量和矩阵,以及⼀个或多个RDD组成的分布式矩阵。本地向量和本地矩阵是⽤作公共接⼝的简单数据模型,底层的线性代数运算由提供。在监督学习中的样本被称为带标签数据点。
1.1、Local vector
本地向量有两种类型,密集型和稀疏型。密集型向量由⼀系列的双精度数构成,⽽稀疏型向量由索引和值构成,其中索引是从0开始的整型数(不同语⾔中向量或数组的索引起始值并不⼀样,⽐如在R语⾔中索引是从1开始)。⽐如向量(1.0,0.0,3.0),它的密集型表⽰⽅法为[1.0,0.0,3.0],⽽稀疏型表⽰⽅法为(3,[0,2],[1.0,3.0]),其中3表⽰向量的维度是3维,0和2代表⾮0维度的索引值(稀疏本来的意思是有⼤量0元素,因⽽只要知道总共有⼏个元素,哪些是⾮0的就可以⽅便的进⾏后续的计算,降低存储空间和cpu的消耗),1.0和3.0是对应维度上的幅度值。
MLlib中本地向量的来源:
密集型向量:
Numpy.array():
**python’s list:**⽐如[1,2,3]
在Spark中⾃动将Numpy.array()和python⾥的list()视为密集型向量
稀疏型向量:
**MLlib’s 稀疏向量:**在pyspark.mllib.linalg下的Vectors包内
scipy.spar.csc_matrix():
在Spark中⾃动将以上两种数据类型视为稀疏型向量
在Spark中np.array()⽐list()产⽣的密集型向量效率要⾼,⽽mllib⾃带的Vectors也要⽐scipy产⽣的稀疏向量的计算效率要⾼。
下⾯是⼀个例⼦:不会说话
import numpy as np
import scipy.spar as sps
from pyspark.mllib.linalg import Vectors
dv1=np.array([1.0,0.0,3.0])#密集型
dv2=[1.0,0.0,3.0]#密集型
sv1=Vectors.spar(3,[0,2],[1.0,3.0])
sv2=sps.csc_matrix((np.array([1.0,3.0]),np.array([0,2]),shape=(3,1))
1.2、Labeled point
带标签数据点带有类别标签,是本地向量数据类型,即可以是密集型也可以是稀疏型。带标签数据点在Mllib中是⽤双精度的,即可以⽤来分类也可以⽤来回归建模,是有监督学习⽅法中的数据源。在⼆分类模型中,标签是0,1;在多类别数据中,标签从0开始递增,如
0,1,2……
from pyspark.mllib.linalg import SparVector
from ssion import LabeledPoint
#建⽴密度型标签数据点
pos=LabeledPoint(1.0,[1.0,0.0,3.0])
neg=LabeledPoint(0.0,[3,[0,2],[1.0,3.0]])
或者参考
由LabeledPoint作为元素构成的rdd如果要查看,不能⽤top()这类函数,但collect()可以,当然collect()
要注意数据量的问题。
稀疏数据
在平常的应⽤中稀疏数据很常⽤,mllib⽀持读取⼀种称为LIBSVM的数据格式,它是和的默认格式,它是存储在本地txt⽂本中的数据格式,标准结构如下:
label index1:value1 index2:value2 ……
排在最前⾯的是标签数据,后⾯是具体数据点,每个数据点由索引和值构成,索引是从1开始升序排列的,在mllib中将LIBSVM数据读⼊之后,索引会⾃动调整为从0开始。由于有索引,我们只需要存储⾮0数据(还没实践验证),这⼤概就是这类数据被称为稀疏数据的原因。可以在⽹上下到这样的⽂件,,我截取开头的4⾏数据如下:
0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 157:252 158:252 159:252 160:237 182:54 183:227 184:253 185:252 186:239 187:233 188:25 2 189:57 190:6 208:10 209:60 210:224 211:252 212:253 213:252 214:202 215:84 216:252 217:253 218:122 236:163 237:252 238:252 239:252 240:253 2 41:252 242:252 243:96 244:189 245:253 246:167 263:51 264:238 265:253 266:253 267:190 268:114 269:253 270:228 271:47 272:79 273:255 274:168 2 90:48 291:238 292:25
2 293:252 294:179 295:12 296:75 297:121 298:21 301:253 302:243 303:50 317:38 318:165 319:253 320:233 321:208 322:84 329:2 53 330:252 331:165 344:7 345:178 346:252 347:240 348:71 349:19 350:28 357:253 358:252 359:195 372:57 373:252 374:252 375:63 385:253 386:252 3 87:195 400:198 401:253 402:190 413:255 414:253 415:196 427:76 428:246 429:252 430:112 441:253 442:252 443:148 455:85 456:252 457:230 458:25 467:7 468:135 469:253 470:186 471:12 483:85 484:252 485:223 494:7 495:131 496:252 497:225 498:71 511:85 512:252 513:145 521:48 522:165 523:25 2 524:173 539:86 540:253 541:225 548:114 549:238 550:253 551:162 567:85 568:252 569:249 570:146 571:48 572:29 573:85 574:178 575:225 576:253 577:223 578:167 579:56 595:85 596:252 597:252 598:252 599:229 600:215 601:252 602:252 603:252 604:196 605:130 623:28 624:199 625:252 626:252 627:253 628:252 629:252 630:233 631:145 652:25 653:128 654:252 655:253 656:252 657:141 658:37
1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 189:253 190:6
2 214:127 215:251 216:251 217:25
3 218:62 241:68 242:236 243:251 244:211 245:31 246:8 268:60 269:228 270:251 271:251 272:9
4 296:15
5 297:253 298:253 299:189 323:20 324:253 325:251 326:235 327:6
6 350:32 351:205 352:2 53 353:251 354:126 378:104 379:251 380:253 381:184 382:15 405:80 406:240 407:251 408:193 409:23 432:32 433:253 434:253 435:253 436:159 460:1 51 461:251 462:251 463:251 464:39 487:48 488:221 489:251 490:251 491:172 515:234 516:251 517:251 518:196 519:12 543:253 544:251 545:251 546: 89 570:159 571:255 572:253 573:253 574:31 597:48 598:228 599:253 600:24
7 601:140 602:
8 625:64 626:251 627:253 628:220 653:64 654:251 655:253 656:220 681:24 682:193 683:253 684:220
1 125:145 126:255 127:211 128:31 152:3
2 153:237 154:25
3 155:252 156:71 180:11 181:175 182:253 183:252 184:71 209:14
4 210:253 211:252 212:71 2 36:16 237:191 238:253 239:252 240:71 264:26 265:221 266:253 267:252 268:124 269:31 293:12
5 294:253 295:252 296:252 297:108 322:253 323:252 3 24:252 325:108 350:255 351:253 352:253 353:108 378:253 379:252 380:252 381:108 406:253 407:252 408:252 409:108 434:253 435:252 436:252 437:1 08 462:255 463:253 464:253 465:170 490:253 491:252 492:252 493:252 494:42 518:149 519:252 520:252 521:252 522:144 546:109 547:252 548:252 54 9:252 550:144 575:218 576:253 577:253 578:255 579:35 603:175 604:252 605:252 606:253 607:35 631:73 632:252 633:252 634:253 635:35 659:31 660: 211 661:252 662:253 663:35
1 153:5 154:63 155:197 181:20 182:254 183:230 184:24 209:20 210:254 211:254 212:48 237:20 238:254 239:255 240:48 265:20 266:254 267:254 268:5 7 293:20 294:254 295:254 296:108 321:16 322:239 323:254 324:143 350:178 351:254 352:143 378:178 379:254 380:143 406:178 407:254 408:16
孔子拜师歇后语下一句
2 434: 178 435:254 436:240 462:11放暑假时间
3 463:25
4 464:240 490:83 491:254 492:24
5 493:31 518:79 519:254 520:24
6 521:38 547:214 548:254 549:150 575:144 576: 241 577:8 603:144 604:240 605:2 631:144 632:254 633:82 659:230 660:24
7 661:40 687:16
8 688:20
9 689:31
我们可以把数据加载到本地节点的内存中。
from pyspark.mllib.utils import MLUtils
examples=MLUtils.loadLibSVMFile(sc,data/mllib/sample_)
1.3、Local Matrix
本地矩阵也是存储在本地机器上的数据格式。像其他场合的矩阵⼀样,都有⾏、列索引和对应的元素值,索引是整型、值是以双精度型。mllib中的矩阵都是按列排列的(⽐如将⼆维的矩阵⽤⼀维的数组来表⽰的时候,第⼀列的元素放在最前⾯,第⼆列的元素次之,⽽不是第⼀⾏的元素排在最前⾯然后
第⼆⾏)。mllib中的矩阵也有密集型和稀疏型,密集型矩阵的表⽰由⾏数、列数、其对应元素值按列依次排列的⼀个数组组成,稀疏型矩阵采⽤Compresd Spar Column(CSC)格式,关于稀疏型数据的表⽰⽅法可。CSC格式数据是将列索引进⾏压缩的格式,表⽰⽅法:⾏数、列数、列索引的偏移值和⾮0元素个数组成的数组、⾏索引组成的数组、⾮0元素按列排序组成的数组。其他的都好理解,关键是利⽤列索引的偏移进⾏列索引压缩,下⾯先看⼀个例⼦:
from pyspark.mllib.linalg import Matrix, Matrices
#创建⼀个密集型矩阵[[1.0 4.0),[2.0 5.0],[3.0 6.0]],注意这⾥是按传统的以⾏为⼀组来表⽰的,以mllib中不同
dm2=Matrices.den(3,2,[1,2,3,4,5,6])
print(dm2)
#创建⼀个稀疏型矩阵[[9.0 0.0),[0.0 8.0],[0.0 6.0]]
sm2=Matrices.spar(3,2,[0,1,3],[0,1,2],[9,8,6])
print(sm2)
我们可以分析⼀下上例中稀疏型矩阵的列索引偏移值和⾮0元素个数[0,1,3],0代表第1列⾥第1个⾮0元素在总的⾮0元素中的相对索引值,1代表第⼆列元素第1个⾮0元素在总的⾮0元素⾥的相对索引值,1-0=1说明第1列的元素个数为1,这样我们结合⾏索引就可以把第1列的全部元素定位出来;同样的⽅法可以确定其他列的元素位置;3表⽰所有⾮0元素的个数,减掉倒数第⼆个偏移量这⾥是1,恰恰就是最后1列的元素个数,同样结合对应的⾏索引就可以定位最后⼀列元素的位置。我们也可以虚拟出⼀列,把3当成第4列第1个元素的相对索引值,这样就可以把各列元素个数的计算统⼀起来。总结起来就是,通过各列第1个⾮0元素在总⾮0元素中的相对索引值可以确定各列的元素个数,结合⾏索引就可完全确定所有元素的位置。从这⾥也可以看到,CSC是⼀种⽆损压缩的格式。
1.4 分布式矩阵
分布式矩阵由长整型的⾏列索引和双精度的值构成,分布式矩阵存储在1个或多个RDD中。到⽬前为⽌,在mllib中总共有4种分布式矩阵。需要注意的是,从⼀种分布式矩阵变换成另⼀种分布式矩阵可能需要数据的全局移动(在不同节点的机器间移动),因⽽可能需要极⼤的内存、cpu和⽹络通信开销。
分布矩阵所依赖的RDD必须是确定的,因为我们需要缓存矩阵的⼤⼩(⽪之不存,⽑将焉覆)。
4种分布式矩阵分别是RowMatrix、IndexRowMatrix、coordinateMatrix、BlockMatrix。RowMatrix是
最基本的分布式矩阵,它是基于⾏的,就是⼀⾏是完整的,完整的⾏存储在本地节点上,不同节点存储不同的⾏。⽽操作这种数据矩阵通常都是对每⾏都进⾏同样的操作,也就是说它是矩阵的哪⼀⾏并不重要,因⽽这种数据矩阵的⼀个特点就是⾏索引⽆意义。特征向量的集合就是这样的⼀类数据矩阵。⼀般情况下,RowMatrix的列数都不会太⼤,也就是单独的⼀⾏不会太长,这样便于在单节点存储和操作,也便于将某⼀⾏的数据传输到驱动器中去。IndexRowMatrix是在RowMatrix的基础上增加了⾏索引,这样可以定位到矩阵的某⼀⾏,否则就⽆法进⾏RDD的join操作。CoordinateMatrix是⽤COO格式来存储的,关于COO。BlockMatrix具有元组的结构(Int,Int,Matrix)。
RowMatrix:
RowMatrix可以通过以向量形式存储的RDD来创建。大象怀孕多久生产
from pyspark.mllib.linalg.distributed import RowMatrix
#⾸先创建⼀个以向量形式存储的RDD.
rows = sc.parallelize([[1,2,3],[4,5,6],[7,8,9],[10,11,12]])
#在以向量形式存储的RDD基础上创建RowMatrix
mat = RowMatrix(rows)
#查看矩阵的⾏数和列数
感谢老师m = mat.numRows()# 4
n = mat.numCols()# 3
#以另⼀种⽅式来查看矩阵的⾏数
rowsRDD = ws
print(m)
print(n)
llect())
输出结果如下:
4
3
[DenVector([1.0, 2.0, 3.0]), DenVector([4.0, 5.0, 6.0]), DenVector([7.0, 8.0, 9.0]), DenVector([10.0, 11.0, 12.0])]
IndexedRowMatrix
IndexedRowMatrix与RowMatrix很像,只不过有⾏索引。每⼀⾏都由长整型的索引和⼀个本地向量构成。
IndexedRowMatrix可以由IndexedRows构成的RDD转换成来,IndexedRow由(long,vector)形式的元组组成。IndexRowMatrix转化为RowMatrix.
from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix
#先创建⼀个由IndexedRow构成的RDD.
#可以通过IndexedRow这个类来创建
indexedRows = sc.parallelize([IndexedRow(0,[1,2,3]),IndexedRow(1,[4,5,6]),IndexedRow(2,[7,8,9]),IndexedRow(3,[10,11,12])])
#也可以直接⽤元组来创建
indexedRows = sc.parallelize([(0,[1,2,3]),(1,[4,5,6]),(2,[7,8,9]),(3,[10,11,12])])
#由IndexedRows构成的RDD来创建IndexedRowMatrix
mat = IndexedRowMatrix(indexedRows)
#测试⼀下该矩阵的⾏、列数
m=mat.numRows()
n=mat.numCols()
ws
print(m)
print(n)
llect())
#将IndexedRowMatrix类型数据转换成RowMatrix
RowMatrix()
运⾏结果如下:
4
3
上户口[IndexedRow(0, [1.0,2.0,3.0]), IndexedRow(1, [4.0,5.0,6.0]), IndexedRow(2, [7.0,8.0,9.0]), IndexedRow(3, [10.0,11.0,12.0])]
CoordinateMatrix
CoordinateMatrix由⾏、列索引和对应值构成,可以通过MatrixEntry来创建,传⼊MatrixEntry的参数形式为(long,long,float),即矩阵元素的⾏、列索和值。CoordinateMatrix可以通过函数toRowMatrix()转换为RowMatrix,通过toIndexRowMatrix()转换为IndexRowMatrix,能过toBlockMatrix()转换为BlockMatrix.
from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry
# 通过MatrixEntry类来创建⼀个由MatrixEntry元素构成的RDD
entries = sc.parallelize([MatrixEntry(0,0,1.2), MatrixEntry(1,0,2.1), MatrixEntry(6,1,3.7)])
#也可以通过形如(long, long, float)的元组来创建
entries = sc.parallelize([(0,0,1.2),(1,0,2.1),(2,1,3.7)])
#通过由MatrixEntries构成的RDD来创建CoordinateMatrix
mat = CoordinateMatrix(entries)
#查看矩阵的⾏列数
m = mat.numRows()# 3
n = mat.numCols()# 2
小学经典诵读print(m)
print(n)
#创建CoordinateMatrix后,后续也可以直接获取其⾮0元素
entriesRDD = ies
llect())
# 转换成RowMatrix.
rowMat = RowMatrix()
#转换成IndexedRowMatrix.
indexedRowMat = IndexedRowMatrix()
#转换成BlockMatrix.
blockMat = BlockMatrix()
运⾏结果如下:
龙华渔村
3
2
[MatrixEntry(0, 0, 1.2), MatrixEntry(1, 0, 2.1), MatrixEntry(2, 1, 3.7)]
BlockMatrix
BlockMatrix,我们可以把这类矩阵叫做由矩阵块组成的矩阵,可以由MatrixBlock来创建,MatrixBlock的形式为((Int,Int),Matrix),式中(Int,Int)代表⼦矩阵在⼤矩阵中的位置,⽽⼦矩阵Matrix本⾝本⾝⼜是rowPerBlock⾏,colPerBlock列的矩阵。BlockMatrix⽀持加法和乘法这样的操作,还可以⽤validate()函数去验证BlockMatrix的设置是否正确。
from pyspark import SparkContext
from pyspark.sql import SparkSession
#import json
sc=SparkContext()
SparkSession(sc)
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix
#创建由⼦矩阵组的RDD
#blocks=sc.parallelize([((0,0),Matrices.den[1.0,2.0,3.0,4.0,5.0,6.0])),((0.1),Matrices.den[7, 8, 9, 10, 11, 12]))])
blocks=sc.parallelize([((0,0),Matrices.den(3,2,[1.0,2.0,3.0,4.0,5.0,6.0])),((0,1),Matrices.den(3,2,[7,8,9,10,11,12]))])
mat=BlockMatrix(blocks,3,2)#从这⾥可以看到每个⼦矩阵的⾏列数应该⼀样,⽽且传到BlockMatrix中的参数,也就是⼦矩阵的⾏列数
m=mat.numRows()
n=mat.numCols()
print(m)
print(n)
blocksRDD=mat.blocks
llect())
#转化为其他矩阵
LocalMatrix()
IndexedRowMatrix()
CoordinateMatrix()
结果如下:
3
4
[((0, 0), DenMatrix(3, 2, [1.0, 2.0, 3.0, 4.0, 5.0, 6.0], 0)), ((0, 1), DenMatrix(3, 2, [7.0, 8.0, 9.0, 10.0, 11.0, 12.0], 0))]
2、Spark 集成算法的数据格式及评估⽅法
不全不准,先放这⾥,以后再慢慢加
类别算法名称数据格式评估⽅法
聚类Kmeans RDD(Local Vector)
分类DecisionTree(决策树)RDD(LabelPoint)ACU,F-measure,ROC 分类RadomForest(随机森林)RDD(Local Vector)ACU,F-measure,ROC 分类LogisticRegression(逻辑回归RDD(Local Vector)ACU,F-measure,ROC 分类NaiveBayes(朴素贝叶斯)RDD(Local Vector)ACU,F-measure,ROC 分类SVM(⽀持向量机)RDD(Local Vector)ACU,F-measure,ROC 回归LinearRegression线性回归RDD(Local Vector)⾃定义
回归Lasso RDD(Local Vector)⾃定义
回归RidgeRegression岭回归RDD(Local Vector)⾃定义
推荐ALS RDD[rating]
ml中的数据类型