Spark在⽂本统计中的简单应⽤
问题起因
学长之前⽤Java写了⼀个程序,有两个⽂档,其中⼀个⽂档是regular expression,⼤概有8万⾏,每⼀⾏是⼀个regular expression,我们称之为pattern;另⼀个⽂档其实是⽂件夹⾥的⼀个,该⽂件夹⾥的所有⽂档都是从twitter上抓取下来的数据,数量从7万~30万不等。⽂件夹中的每⼀个⽂档代表⼀个topic,共有8个⽂档,即8个topic.
⽬的:
⽤每⼀个pattern去匹配每⼀个topic中的每⼀条tweet,统计每个pattern在每个topic中出现的次数。
白蜻蜓以上问题跟统计词频类似。程序⽤的是常规的嵌套循环,如下:
for (each Topic){ // 8 topics
for (each LineOfTopic){ // average 200,000 lines
for (each pattern){ // about 80,000 patterns
形容很多的成语
while (match){
patternCount++; // count the frequency of the pattern in this topic
}
}
}
}
⼀共有8个topic,每个topic的⾏数平均约20万,pattern数量约8万,以上代码⼀共需要循环次,即100多亿次。同学5天前开始跑,到现在还没跑完。慢的原因⼤概有两个:
1. 循环次数太多
2. 正则表达式的匹配⽐较耗费时间
整个程序看起来跟MapReduce的经典例⼦WordCount很像,于是我觉得⽤MapReduce的⽅法可以优化⼀下。但其实,我没有学过Java,也没有学过并⾏运算。算是学过MapReduce,在Data Mining的
课上有MapReduce的讲解和上机作业。但是上机讲解我有事没有去上课,然后我就去爬⼭了,等到下⼭当天才发现第⼆天就是MapReduce作业的deadline…
MapReduce与Spark
Apache Spark是由加州⼤学伯克利分校AMPLab开发的⽤于处理⼤数据处理的框架,可以在上⾯实现MapReduce. spark是『闪光,闪烁』的意思,表⽰Spark处理数据其实很快哦!
据资料讲,Spark⽐Hadoop更⾼,更快,更强。之前Data Mining课程上我们实现MapReduce应⽤时⽤的是Hadoop,第⼆年⽼师就换成Spark了。
没有打算在这⾥介绍MapReduce和Spark,因为我也不是很懂。我参考的资料并且觉得⽐较好的有下⾯这些:
详细讲解了MapReduce的原理与运作过程。官⽅⽂档,介绍了Spark的基本原理,可以对Saprk的框架有⼀个⼤概的了解。讲解了如何在Ubuntu上安装hadoop,步骤很详细。因为Saprk是基于Hadoop的,因此安装Spark之前要先安装Hadoop.详细讲解了如何在Ubuntu上安装Spark.
Spark使⽤了⼀种抽象的数据类型RDD (Resilient Distributed Datats, 弹性分布数据集),各种操作都是基于RDD进⾏。Spark将其分发到集群的各个结点上进⾏并⾏操作。Spark中⼀种经常使⽤到的数据
结构是Key/Value结构,这些数据结构也是RDD,称为pair RDD. 这篇⽂档详细讲解了pair RDD的各种操作。
介绍Saprk基本的开发流程与步骤。
⼀个Saprk应⽤是b运⾏包含了⼀个在⽤户定义的main函数中的驱动程序(driver program),然后在集群(cluster)上并⾏执⾏各种操作。driver program由SaprkContext对象定义,所以要使⽤Spark⾸先要⽤SparkContext创建⼀个driver program,然后才在该驱动程序上运⾏cluster. ⽽运⾏cluster时Spark要先连接Cluster Manager来分配资源。⼀旦连接上Cluster Manager,Saprk会在这个cluster 的每⼀个节点(node)上建⽴executor. executor是⽤于处理数据计算数据存储数据的处理器(processor)。之后Spark会将⽤户写的代码发送到各个节点的executor上。这时,executor有了代码就可以执⾏各种操作了,Spark让executor执⾏指定操作(由代码决定)的
开眼看世界
过程叫做任务(task)。下图是Spark执⾏⼀个任务的框架图。
程序优化
创建Spark on Java
我没有学过Java,因此只能⼀步⼀步照着各种⽂档操作。Spark on Java是通过Maven建⽴的(Maven是什么?我不懂……)。创建Maven时官⽅⽂档说需要在l中加上如下信息:
但我使⽤NetBeans 8.1添加以上信息后编译没有通过,后来⽹上查到还需要在l中的properties节点下添加以下内容才可以(不知道原因):
<dependencies >
<dependency >
<groupId >org.apache.spark </groupId >
<artifactId >spark-core_2.11</artifactId >
少数民族的风俗<version >2.0.0</version >
</dependency >
</dependencies >
<spark.version >2.0.0</spark.version >
⾸先在使⽤到Spark的函数中创建driver program.
上⾯代码中的变量sc就是driver program啦!
读取数据
Spark读取数据很简单:
注意到text是⼀个RDD. 以上代码会将⽂本⼀次性全部读⼊内存,然后⾃动按⾏分割。
map 与reduce 操作
map操作将数据分为若⼲⼦数据集,使得Spark可以将这些数据集分配到不同的节点上执⾏并⾏操作。执⾏map操作后返回的是RDD数据类型。Spark的map操作有很多种,这⾥⽤的是mapToPair,返回的是⼀个pair RDD,即Key/Value结构。上述代码中的JavaPairRDD<String, Integer>表⽰这个pair RDD的key是String类型,即pattern名称;value是int类型,即pattern的出现次数。ur_defined_func 是⾃⼰定义的map算⼦,它的输⼊默认是⽂本的每⼀⾏。
我的问题是:有⼀个pattern⽂档,有若⼲tweet⽂档,计算pattern⽂档中的每⼀个pattern在每⼀个tweet⽂档中出现次数。相当于是给定
⼀个word list,统计这个word list中的每个单词的词频。如果只是统计word list的词频,那⽐较简单,可以直接⽤Spark的WordCount例⼦统计出词频,然后再⽤Spark提供的filter函数过滤掉不在word list中的单词即可。
但⽬前并不是简单统计词频,这个任务还有⼀个正则表达式匹配的过程。最开始的想法是:对pattern⽂档做⼀个map操作,得到Pattern RDD,同时⽣成tweet⽂档的Tweet RDD,这样得到两个RDD,然后在Pattern RDD的map函数中对tweet的RDD做统计。像是这样: 上图中,分别⽣成了两个RDD,但只对Pattern RDD做map和reduce操作,且在Pattern RDD的map中引⽤了Tweet RDD. 注意到Saprk 中所有的操作都是在抽象数据类型RDD中进⾏,因此map和reduce其实是RDD的对象(上⾯的代码中可以看到),因此图中的map操作实际上是在Pattern RDD中引⽤了Tweet RDD.听起来好完美,然⽽并不是。Saprk中RDD不可以嵌套,具体来说就是不可以在map或reduce操作中引⽤另⼀个RDD. 因为RDD是不可序列化的(non-rializable)。Spark要将任务分配到不同的节点,需要序列化所有⽤到的变量,然后再反序列化(其实我并不懂什么是java的序列化……)。如果这样操作会报『任务未序列化的错误』。
SaprkConf conf = new SaprkConf().tAppName("countPatternPerTopic").tMaster("local[4]");JavaSparkContext sc = new JavaSparkContex(conf);
JavaRDD<String> patternRDD = sc.textFile(filePath);
JavaPairRDD<String, Integer> PattPair = patternRDD.mapToPair(ur_defined_func);
因为对Java不熟,对Spark也不熟,因此我⽤了很直接的⽅法:只对Pattern做map操作,在Pattern RDD的reduce操作中遍历每条tweet,统计当前pattern在该tweet⽂档中出现的次数。即将原来代码中的三个for循环缩减到两个,原来pattern的循环通过Spark的分布式计算来完成,原来对tweet的循环放到了Pattern RDD的⾥⾯完成。
但是这样看起来效率仍然很低,只是对其中⼀个⽂档进⾏MapReduce的处理,⽽且还是pattern⽂档,通常tweet⽂档中的tweet数量会远⼤于pattern数量,这样与原来代码相⽐循环次数并没有降低多少。不过短时间我想不出其它办法了,当我测试完成后,8个topic已经跑完6个了。
其实可以将tweet⽂档分成若⼲份物理⽂档,然后统计pattern出现的次数,最后再合并。但如果可以⾃
动将⽂档分割为什么要⼿动呢?⽽且我发现实验室的那些⼈,跑数据的过程正好是休息的过程,开会时可以跟⽼师说『数据还在跑』,何乐⽽不为呢?
测试结果
只在本机上做了测试,不知道如果布设多台机器会不会更快⼀点呢?
Spark优化:
pattern数量:88028
topic数量:1(即只有⼀个tweet⽂档)
tweet数量:67883
测试环境:
Ubuntu 14.04 64bit 16GB
8 Intel(R) Core(TM) i7-4770 CUP @ 3.40GHz
时间:约10个⼩时
原始代码:
pattern数量:88028
topic数量:1(即只有⼀个tweet⽂档)
如何制作ppt
tweet数量:67883
测试环境:Ubuntu 14.04 64bit 16GB
其余未知(另⼀个同学跑的,没有问)
时间: 38个⼩时
虽然⽤了Spark仍然要跑10个⼩时,但是毕竟缩短了将近20个⼩时啊!
代码
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.SparkConf;
高考哥import scala.Tuple2;
import scala.Tuple2;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
import java.util.logging.Level;
import java.util.logging.Logger;
import Matcher;
import Pattern;
public static void countPatternsPerTopicMapReduce(String pattPath, String topsPath, String outPath){
// 定义driver program
SparkConf conf = new SparkConf().tAppName("countPatternsPerEmotion").tMaster("local[4]");
JavaSparkContext sc = new JavaSparkContext(conf);
}
try{
String line = "";
int size = 0;
// 将tweet读⼊内存供Pattern RDD的map函数使⽤
车遥遥篇// 由于tweet并不是很⼤,因此可以完全读⼊内存中。但如果tweet超过内存⼤⼩,该⽅法并不可⾏
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(topsPath), "UTF8"));
ArrayList<String> textList = new ArrayList<>();
声卡怎么使用while ((line = br.readLine()) != null){
line = LowerCa().trim();
textList.add(line);
}
int size = textList.size();
// 定义map函数
class GetPattPair implements PairFunction<String, String, Integer> {
public Tuple2<String, Integer> call(String s) { // s is pattern
int patCount = 0;
for (int i = 0; i < size; i++){
String text = (i);
Matcher matcher;
matcher = pile(s).matcher(text);
while (matcher.find()) { // 统计pattern在该⽂档中出现的次数
patCount++;
}
}
// 返回⼀个pair RDD,使⽤的类型是scala Tuple2类型
return new Tuple2<String, Integer>(s, patCount);
}
}
// 读取pattern⽂档
JavaRDD<String> patterns = sc.textFile(pattPath);
// map函数,将pattern映射到⼀个pair RDD
JavaPairRDD<String, Integer> PattPair = patterns.mapToPair(new GetPattPair());
/
/ reduce
// 这⾥⽤到reduceByKey,实现对相同key的RDD在value上进⾏加总。
// 实际上这⼀步骤略多余。mapToPair中每次只返回⼀条pattern⽽且不会重复,整个过程不会有重复的pattern出现,就⽆必要reduce
JavaPairRDD<String, Integer> PattCount = duceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer a, Integer b) {