spark wordcount.py 怎么用
展开全部 创建 maven 工程使用下面命令创建一个普通的 maven 工程:bash$ mvn archetype:generate -DgroupId=com.cloudera.sparkwordcount -DartifactId=sparkwordcount -DarchetypeArtifactId=maven-archetype-quickstart -DinteractiveMode=false将 sparkwordcount 目录重命名为simplesparkapp,然后,在 simplesparkapp 目录下添加 scala 源文件目录:bash$ mkdir -p sparkwordcount/src/main/scala/com/cloudera/sparkwordcount修改 pom.xml 添加 scala 和 spark 依赖:xml org.scala-lang scala-library 2.10.4 org.apache.spark spark-core_2.10 1.2.0-cdh5.3.0 添加编译 scala 的插件:xml org.scala-tools maven-scala-plugin compile testCompile 添加 scala 编译插件需要的仓库:xml scala-tools.org Scala-tools Maven2 Repository http://scala-tools.org/repo-releases 另外,添加 cdh hadoop 的仓库:xml scala-tools.org Scala-tools Maven2 Repository http://scala-tools.org/repo-releases maven-hadoop Hadoop Releases https://repository.cloudera.com/content/repositories/releases/ cloudera-repos Cloudera Repos https://repository.cloudera.com/artifactory/cloudera-repos/ 最后,完整的 pom.xml 文件见: https://github.com/javachen/simplesparkapp/blob/master/pom.xml 。
运行下面命令检查工程是否能够成功编译:bashmvn package编写示例代码以 WordCount 为例,该程序需要完成以下逻辑:读一个输入文件统计每个单词出现次数过滤少于一定次数的单词对剩下的单词统计每个字母出现次数在 MapReduce 中,上面的逻辑需要两个 MapReduce 任务,而在 Spark 中,只需要一个简单的任务,并且代码量会少 90%。
编写 Scala 程序 如下:scalaimport org.apache.spark.SparkContextimport org.apache.spark.SparkContext._import org.apache.spark.SparkConfobject SparkWordCount { def main(args: Array[String]) { val sc = new SparkContext(new SparkConf().setAppName("Spark Count")) val threshold = args(1).toInt // split each document into words val tokenized = sc.textFile(args(0)).flatMap(_.split(" ")) // count the occurrence of each word val wordCounts = tokenized.map((_, 1)).reduceByKey(_ + _) // filter out words with less than threshold occurrences val filtered = wordCounts.filter(_._2 >= threshold) // count characters val charCounts = filtered.flatMap(_._1.toCharArray).map((_, 1)).reduceByKey(_ + _) System.out.println(charCounts.collect().mkString(", ")) charCounts.saveAsTextFile("world-count-result") }}Spark 使用懒执行的策略,意味着只有当 动作 执行的时候, 转换 才会运行。
上面例子中的 动作 操作是 collect 和 saveAsTextFile ,前者是将数据推送给客户端,后者是将数据保存到 HDFS。
作为对比, Java 版的程序 如下:javaimport java.util.ArrayList;import java.util.Arrays;import org.apache.spark.api.java.*;import org.apache.spark.api.java.function.*;import org.apache.spark.SparkConf;import scala.Tuple2;public class JavaWordCount { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count")); final int threshold = Integer.parseInt(args[1]); // split each document into words JavaRDD tokenized = sc.textFile(args[0]).flatMap( new FlatMapFunction() { public Iterable call(String s) { return Arrays.asList(s.split(" ")); } } ); // count the occurrence of each word JavaPairRDD counts = tokenized.mapToPair( new PairFunction() { public Tuple2 call(String s) { return new Tuple2(s, 1); } } ).reduceByKey( new Function2() { public Integer call(Integer i1, Integer i2) { return i1 + i2; } } );另外, Python 版的程序 如下:pythonimport sysfrom pyspark import SparkContextfile="inputfile.txt"count=2if __name__ == "__main__": sc = SparkContext(appName="PythonWordCount") lines = sc.textFile(file, 1) counts = lines.flatMap(lambda x: x.split(' ')) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b) \ .filter(lambda (a, b) : b >= count) \ .flatMap(lambda (a, b): list(a)) \ .map(lambda x: (x, 1)) \ .reduceByKey(lambda a, b: a + b) print ",".join(str(t) for t in cou...
如何使用Python为Hadoop编写一个简单的MapReduce程序
在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序。
尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现Hadoop程序。
尽管Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与Hadoop 关联进行编程,看看位于/src/examples/python/WordCount.py 的例子,你将了解到我在说什么。
我们想要做什么?我们将编写一个简单的 MapReduce 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。
我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。
结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。
先决条件编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。
如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群Python的MapReduce代码使用Python编写MapReduce代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。
这是真的,别不相信!Map: mapper.py将下列的代码保存在/home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:注意:要确保这个脚本有足够权限(chmod +x /home/hadoop/mapper.py)。
#!/usr/bin/env pythonimport sys# input comes from STDIN (standard input)for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\\t%s' % (word, 1)在这个脚本中,并不计算出单词出现的总数,它将输出 " 1" 迅速地,尽管可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。
当然你可以改变下编码风格,完全尊重你的习惯。
Reduce: reducer.py将代码存储在/home/hadoop/reducer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。
同样,要注意脚本权限:chmod +x /home/hadoop/reducer.py#!/usr/bin/env pythonfrom operator import itemgetterimport sys# maps words to their countsword2count = {}# input comes from STDINfor line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\\t', 1) # convert count (currently a string) to int try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: # count was not a number, so silently # ignore/discard this line pass# sort the words lexigraphically;## this step is NOT required, we just do it so that our# final output will look more like the official Hadoop# word count examplessorted_word2count = sorted(word2count.items(), key=itemgetter(0))# write the results to STDOUT (standard output)for word, count in sorted_word2count: print '%s\\t%s'% (word, count)测试你的代码(cat data | map | sort | reduce)我建议你在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果这里有一些建议,关于如何测试你的Map和Reduce的功能:——————————————————————————————————————————————\r\n # very basic test hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1—————————————————————————————————————————————— hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py bar 1 foo 3 labs 1—————————————————————————————————————————————— # using one of the ebooks as example input # (see below on where to get the ebooks) hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py The 1 Project 1 Gutenberg 1 EBook 1 of 1 [...] (you get the idea) quux 2 quux 1 ————————————————————————————...
转载请注明出处51数据库 » wordcount.py
占戈灬礻申