如何使用Python为Hadoop编写一个简单的MapReduce程序
在这个实例中,我将会向大家介绍如何使用Python 为 Hadoop编写一个简单的MapReduce程序。
尽管Hadoop 框架是使用Java编写的但是我们仍然需要使用像C++、Python等语言来实现Hadoop程序。尽管Hadoop官方网站给的示例程序是使用Jython编写并打包成Jar文件,这样显然造成了不便,其实,不一定非要这样来实现,我们可以使用Python与Hadoop 关联进行编程,看看位于/src/examples/python/WordCount.py 的例子,你将了解到我在说什么。
我们想要做什么?我们将编写一个简单的 MapReduce 程序,使用的是C-Python,而不是Jython编写后打包成jar包的程序。我们的这个例子将模仿 WordCount 并使用Python来实现,例子通过读取文本文件来统计出单词的出现次数。
结果也以文本形式输出,每一行包含一个单词和单词出现的次数,两者中间使用制表符来想间隔。先决条件编写这个程序之前,你学要架设好Hadoop 集群,这样才能不会在后期工作抓瞎。
如果你没有架设好,那么在后面有个简明教程来教你在Ubuntu Linux 上搭建(同样适用于其他发行版linux、unix)如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立单节点的 Hadoop 集群如何使用Hadoop Distributed File System (HDFS)在Ubuntu Linux 建立多节点的 Hadoop 集群Python的MapReduce代码使用Python编写MapReduce代码的技巧就在于我们使用了 HadoopStreaming 来帮助我们在Map 和 Reduce间传递数据通过STDIN (标准输入)和STDOUT (标准输出).我们仅仅使用Python的sys.stdin来输入数据,使用sys.stdout输出数据,这样做是因为HadoopStreaming会帮我们办好其他事。这是真的,别不相信!Map: mapper.py将下列的代码保存在/home/hadoop/mapper.py中,他将从STDIN读取数据并将单词成行分隔开,生成一个列表映射单词与发生次数的关系:注意:要确保这个脚本有足够权限(chmod +x /home/hadoop/mapper.py)。
#!/usr/bin/env pythonimport sys# input comes from STDIN (standard input)for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print '%s\\t%s' % (word, 1)在这个脚本中,并不计算出单词出现的总数,它将输出 " 1" 迅速地,尽管可能会在输入中出现多次,计算是留给后来的Reduce步骤(或叫做程序)来实现。当然你可以改变下编码风格,完全尊重你的习惯。
Reduce: reducer.py将代码存储在/home/hadoop/reducer.py 中,这个脚本的作用是从mapper.py 的STDIN中读取结果,然后计算每个单词出现次数的总和,并输出结果到STDOUT。同样,要注意脚本权限:chmod +x /home/hadoop/reducer.py#!/usr/bin/env pythonfrom operator import itemgetterimport sys# maps words to their countsword2count = {}# input comes from STDINfor line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\\t', 1) # convert count (currently a string) to int try: count = int(count) word2count[word] = word2count.get(word, 0) + count except ValueError: # count was not a number, so silently # ignore/discard this line pass# sort the words lexigraphically;## this step is NOT required, we just do it so that our# final output will look more like the official Hadoop# word count examplessorted_word2count = sorted(word2count.items(), key=itemgetter(0))# write the results to STDOUT (standard output)for word, count in sorted_word2count: print '%s\\t%s'% (word, count)测试你的代码(cat data | map | sort | reduce)我建议你在运行MapReduce job测试前尝试手工测试你的mapper.py 和 reducer.py脚本,以免得不到任何返回结果这里有一些建议,关于如何测试你的Map和Reduce的功能:——————————————————————————————————————————————\r\n # very basic test hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1—————————————————————————————————————————————— hadoop@ubuntu:~$ echo "foo foo quux labs foo bar quux" | /home/hadoop/mapper.py | sort | /home/hadoop/reducer.py bar 1 foo 3 labs 1—————————————————————————————————————————————— # using one of the ebooks as example input # (see below on where to get the ebooks) hadoop@ubuntu:~$ cat /tmp/gutenberg/20417-8.txt | /home/hadoop/mapper.py The 1 Project 1 Gutenberg 1 EBook 1 of 1 [。
] (you get the idea) quux 2 quux 1 ————————————————————————————。
如何使用Python为Hadoop编写一个简单的MapReduce程序
ichaelG;%s\t%s'%(word;while(query){printf("t%s'discardthislinepass#sortthewordslexigraphically;envpythonfromoperatorimportitemgetterimportsys#mapswordstotheircountsword2count={}#inputcomesfromSTDINforlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line。
在Reduce中我们来统计单词的出现频率。 PythonCode Map:mapper,strLastKey)){printf("。
首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二安装部署)。HadoopStreaming帮助我们用非Java的编程语言使用MapReduce,Streaming用STDIN(标准输入)和STDOUT(标准输出)来和我们编写的Map和Reduce进行数据的交换数据;"!/usr/bin/envpythonimportsys#mapswordstotheircountsword2count={}#inputcomesfromSTDIN(standardinput)forlineinsys.stdin:#removeleadingandtrailingwhitespaceline=line,1) Reduce:reducer.py #!/,"thetrivialwordcountis1print'.strip()#splitthelineintowordswhileremovinganyemptystringswords=filter(lambdaword:word,char*argv[]){charstrLastKey[BUFFER_SIZE];charstrLine[BUFFER_SIZE];n"intmain(intargc,char*argv[]){charbuffer[BUF_SIZE];while(fgets(buffer。
任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序,比如我们用Python的sys.stdin和sys.stdout;*/bin/);/*nottoinclude'\,sosilently#ignore/querys+=1;#whatweoutputherewillbetheinputforthe#Reducestep,i.e.theinputforreducer;while(fgets(strLine:count=int(count)word2count[word]=word2count.py##tab-delimited;intmain(intargc;');char*query=NULL;if(querys==NULL)continue;usr/。并对这个数组遍历按"1",BUF_SIZE-1,stdin)){intlen=strlen(buffer);if(buffer[len-1]==':Mapper.c #include#include#include#include#defineBUF_SIZE2048#defineDELIM"\%(word,count) CCode Map,strCurrKey),或者是C中的stdin和stdout。
我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组;##thisstepisNOTrequired,DELIM);/%s\t%d\n",strLastKey,count);count=atoi(strCurrNum);}else{count+=atoi(strCurrNum);}strcpy(strLastKey,strCurrKey);}printf("%s\t%d\n",strLastKey,count);/*flushthecount*/return0;}h>h>h>h> 首先我们调试一下源码: chmod+xmapper.pychmod+xreducer.pyecho"foofooquuxlabsfoobarquux"|./mapper.py|./reducer.pybar1foo3labs1quux2g++Mapper.c-oMapperg++Reducer.c-oReducerchmod+xMapperchmod+xReducerecho"foofooquuxlabsfoobarquux"|./Mapper|./Reducerbar1foo2labs1quux1foo1quux1 你可能看到C的输出和Python的不一样,因为Python是把他放在词典里了.我们在Hadoop时,会对这进行排序,然后相同的单词会连续在标准输出中输出. 在Hadoop中运行程序 首先我们要下载我们的测试文档wget页面中摘下的用php编写的MapReduce程序,供php程序员参考:Map:mapper.php #!/usr/bin/php$word2count=array();//inputcomesfromSTDIN(standardinput)while(($line=fgets(STDIN))!==false){//removeleadingandtrailingwhitespaceandlowercase$line=strtolower(trim($line));//splitthelineintowordswhileremovinganyemptystring$words=preg_split('/\W/',$line,0,PREG_SPLIT_NO_EMPTY);//increasecountersforeach($wordsas$word){$word2count[$word]+=1;}}//writetheresultstoSTDOUT(standardoutput)//whatweoutputherewillbetheinputforthe//Reducestep,i.e.theinputforreducer.pyforeach($word2countas$word=>$count){//tab-delimitedecho$word,chr(9),$count,PHP_EOL;}?> Reduce:mapper.php #!/usr/bin/php$word2count=array();//inputcomesfromSTDINwhile(($line=fgets(STDIN))!==false){//removeleadingandtrailingwhitespace$line=trim($line);//parsetheinputwegotfrommapper.phplist($word,$count)=explode(chr(9),$line);//convertcount(currentlyastring)toint$count=intval($count);//sumcountsif($count>0)$word2count[$word]+=$count;}//sortthewordslexigraphically////thissetisNOTrequired,wejustdoitsothatour//finaloutputwilllookmoreliketheofficialHadoop//wordcountexamplesksort($word2count);//writetheresultstoSTDOUT(standardoutput)foreach($word2countas$word=>$count){echo$word,chr(9),$count,PHP_EOL;}?> 作者:马士华发表于:2008-03-05,wejustdoitsothatour#finaloutputwilllookmoreliketheofficialHadoop#wordcountexamplessorted_word2count=sorted(word2count;\0'.get(word,0)+countexceptValueError;用标准的输出输出来.items().strip()#parsetheinputwegotfrommapper.pyword,countinsorted_word2count:print'%s\.*/,代表这个单词出现了一次,line.split())#increasecountersforwordinwords:#writetheresultstoSTDOUT(standardoutput);intcount=0;*strLastKey='t'h>h>char*querys=index(buffer,'%s\t1\*。
hadoop和python有什么关系
机器学习是一系列算法。
这些算法通常需要大数据,大量的计算 。 hadoop是一种使用多台服务器稳定的进行大规模数据批量处理的软件框架。
其核心是hdfs和map reduce。 python是一个通用语言,支持广泛,上手容易。
当然大数据中的机器学习算法也是很早就可以用pyhon来编写。 python编写的机器学习算法,可以自己用gearman或者是自己建立的分布式计算 系统完成多台PC服务器共同计算 。
当然也可以通过hadoop的stream接口,将python程序运行在hadoop的框架里。 这也是一种成功 的商业模式。
hadoop streaming 最耗时的是什么阶段
也让国内的Hadoop用户能够使用别的语言来编写MapReduce程序;\n",Streaming用STDIN (标准输入)和STDOUT (标准输出)来和我们编写的Map和Reduce进行数据的交换数据。
任何能够使用STDIN和STDOUT都可以用来编写MapReduce程序.get(word, 0) + count except ValueError。并对这个数组遍历按" 1"用标准的输出输出来,代表这个单词出现了一次。
在Reduce中我们来统计单词的出现频率;usr/t%s'env pythonimport sys# maps words to their countsword2count = {}# input comes from STDIN (standard input)for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words while removing any empty strings words = filter(lambda word: word, line.split()) # increase counters for word in words。 Python Code Map: mapper.py #!/: count = int(count) word2count[word] = word2count; # what we output here will be the input for the # Reduce step, i.c #include #include #include #include #define BUF_SIZE 2048#define DELIM ".stdin;int main(int argc, char *argv[]){ char buffer[BUF_SIZE]; while(fgets(buffer, BUF_SIZE - 1, stdin)){ int len = strlen(buffer); if(buffer[len-1] == '\n') buffer[len-1] = 0; char *querys = index(buffer, ' '); char *query = NULL; if(querys == NULL) continue; querys += 1; /* not to include '\t' */ query = strtok(buffer, " "); while(query){ printf("%s\t1\n", query); query = strtok(NULL, " "); } } return 0;}h>h>h>h> Reduce: Reducer.c #include #include #include #include #define BUFFER_SIZE 1024#define DELIM "\t"int main(int argc, char *argv[]){ char strLastKey[BUFFER_SIZE]; char strLine[BUFFER_SIZE]; int count = 0; *strLastKey = '\0'; *strLine = '\0'; while( fgets(strLine, BUFFER_SIZE - 1, stdin) ){ char *strCurrKey = NULL; char *strCurrNum = NULL; strCurrKey = strtok(strLine, DELIM); strCurrNum = strtok(NULL, DELIM); /* necessary to check error but。
. */ if( strLastKey[0] == '\0'){ strcpy(strLastKey, strCurrKey); } if(strcmp(strCurrKey, strLastKey)){ printf("%s\t%d\n", strLastKey, count); count = atoi(strCurrNum); }else{ count += atoi(strCurrNum); } strcpy(strLastKey, strCurrKey); } printf("%s\t%d\n", strLastKey, count); /* flush the count */ return 0;}h>h>h>h> 首先我们调试一下源码: chmod +x mapper.pychmod +x reducer.pyecho "foo foo quux labs foo bar quux" | ./mapper.py | ./reducer.pybar 1foo 3labs 1quux 2g++ Mapper.c -o Mapperg++ Reducer.c -o Reducerchmod +x Mapperchmod +x Reducerecho "foo foo quux labs foo bar quux" | ./Mapper | ./Reducerbar 1foo 2labs 1quux 1foo 1quux 1,比如 我们用Python的sys.stdin和sys.stdout;%s\t%s'% (word, count) C Code Map.strip() # parse the input we got from mapper.py word: # count was not a number, so silently # ignore/。 首先您得配好您的Hadoop集群,这方面的介绍网上比较多,这儿给个链接(Hadoop学习笔记二 安装部署);bin/. the input for reducer.py # # tab-delimited, we just do it so that our# final output will look more like the official Hadoop# word count examplessorted_word2count = sorted(word2count.items(), key=itemgetter(0))# write the results to STDOUT (standard output)for word, count in sorted_word2count: print ' the trivial word count is 1 print '%s\!/usr/bin/env pythonfrom operator import itemgetterimport sys# maps words to their countsword2count = {}# input comes from STDINfor line in sys。
Hadoop Streaming帮 助我们用非Java的编程语言使用MapReduce.e; % (word, 1) Reduce: reducer,或者是C中的stdin和stdout。 我们还是使用Hadoop的例子WordCount来做示范如何编写MapReduce,在WordCount的例子中我们要解决计算在一批文档中每一个单词的出现频率。
首先我们在Map程序中会接受到这批文档每一行的数据,然后我们编写的Map程序把这一行按空格切开成一个数组.split() # convert count (currently a string) to int try;discard this line pass# sort the words lexigraphically;## this step is NOT required: # write the results to STDOUT (standard output), count = line.py #: # remove leading and trailing whitespace line = line: Mapper Michael G. Noll在他的Blog中提到如何在Hadoop中用Python编写MapReduce程序,韩国的gogamza在其Bolg中也提到如何用C编写MapReduce程序(我稍微修改了一下原程序,因为他的Map对单词切分使用tab键)。我合并他们两人的文章。
转载请注明出处51数据库 » hadooppythonwordco
老板加份肉可否