如何在Windows下的Eclipse中直接运行Storm的WordCountTopology
展开全部 1.找到examples例子我们需要找打这个例子的位置:首先需要找到你的hadoop文件夹,然后依照下面路径:/hadoop/share/hadoop/mapreduce第二步:我们需要需要做一下运行需要的工作,比如输入输出路径,上传什么文件等。
1.先在HDFS创建几个数据目录:1.hadoop fs -mkdir -p /data/wordcount2.hadoop fs -mkdir -p /output/2.目录/data/wordcount用来存放Hadoop自带的WordCount例子的数据文件,运行这个MapReduce任务的结果输出到/output/wordcount目录中。
首先新建文件inputWord:1.vi /usr/inputWord新建完毕,查看内容:将本地文件上传到HDFS中:可以查看上传后的文件情况,执行如下命令:1.hadoop fs -ls /data/wordcount可以看到上传到HDFS中的文件。
登录到Web控制台,访问链接可以看到任务记录情况。
...
如何在eclipse调试storm程序
一、介绍 storm提供了两种运行模式:本地模式和分布式模式。
本地模式针对开发调试storm topologies非常有用。
Storm has two modes of operation: local mode and distributed mode. In local mode, Storm executes completely in process by simulating worker nodes with threads. Local mode is useful for testing and development of topologies 因为多数程序开发者都是使用windows系统进行程序开发,如果在本机不安装storm环境的情况下,开发、调试storm程序。
如果你正在为此问题而烦恼,请使用本文提供的方法。
二、实施步骤 如何基于eclipse+maven调试storm程序,步骤如下: 1.搭建好开发环境(eclipse+maven,本人使用的是eclipse Kepler 与maven3.1.1) 2.创建maven项目,并修改pom.xml,内容如pom.xml(机器联网,下载所需的依赖jar) Github上的pom.xml,引入的依赖太多,有些不需要, 3. 编写storm程序,指定为本地模式运行。
本文提供的程序是wordcount 重要的是LocalCluster cluster = new LocalCluster();这一句 Config conf = new Config(); conf.setDebug(true); conf.setNumWorkers(2); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); Utils.sleep(10000); cluster.killTopology("test"); cluster.shutdown(); pom.xml文件 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0http://maven.apache.org/xsd/maven-4.0.0.xsd"> 4.0.0 storm.starter storm-starter 0.0.1-SNAPSHOT jar UTF-8 github-releases http://oss.sonatype.org/content/repositories/github-releases/ clojars.org http://clojars.org/repo junit junit 4.11 test storm storm 0.9.0.1 provided commons-collections commons-collections 3.2.1 storm程序 package storm.starter; import java.util.HashMap; import java.util.Map; import storm.starter.spout.RandomSentenceSpout; import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; import backtype.storm.topology.BasicOutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Values;/** * This topology demonstrates Storm's stream groupings and multilang * capabilities. */ public class WordCountTopology { public static class SplitSentence extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { try { String msg = input.getString(0); System.out.println(msg + "-------------------"); if (msg != null) { String[] s = msg.split(" "); for (String string : s) { collector.emit(new Values(string)); } } } catch (Exception e) { e.printStackTrace(); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } } public static class WordCount extends BaseBasicBolt { Map counts = new HashMap(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); if (count == null) count = 0; count++; counts.put(word, count); collector.emit(new Values(word, count)); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } } public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping( "spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopology(args[0], conf, builder.createTopology()); } else { conf.setMaxTaskParallelism(3); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(10000); cluster.shutdown(); } } } package storm.starter.spout; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.utils.Utils; import java.util.Map; import java.util.Random; public class RandomSentenceSpout extends BaseRichSpout { SpoutOutputCollector _collector; Random _rand; @Override public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { _collector = collector; _rand = new Random(); } @Override public ...
hadoop中wordcount的代码求解释
1. 创建本地的示例数据文件:依次进入【Home】-【hadoop】-【hadoop-1.2.1】创建一个文件夹file用来存储本地原始数据。
并在这个目录下创建2个文件分别命名为【myTest1.txt】和【myTest2.txt】或者你想要的任何文件名。
分别在这2个文件中输入下列示例语句:2. 在HDFS上创建输入文件夹呼出终端,输入下面指令:bin/hadoop fs -mkdir hdfsInput执行这个命令时可能会提示类似安全的问题,如果提示了,请使用bin/hadoop dfsadmin -safemode leave来退出安全模式。
当分布式文件系统处于安全模式的情况下,文件系统中的内容不允许修改也不允许删除,直到安全模式结 束。
安全模式主要是为了系统启动的时候检查各个DataNode上数据块的有效性,同时根据策略必要的复制或者删除部分数据块。
运行期通过命令也可以进入 安全模式。
意思是在HDFS远程创建一个输入目录,我们以后的文件需要上载到这个目录里面才能执行。
3. 上传本地file中文件到集群的hdfsInput目录下在终端依次输入下面指令:cd hadoop-1.2.1bin/hadoop fs -put file/myTest*.txt hdfsInput4. 运行例子:在终端输入下面指令:bin/hadoop jar hadoop-examples-1.2.1.jar wordcount hdfsInput hdfsOutput注意,这里的示例程序是1.2.1版本的,可能每个机器有所不一致,那么请用*通配符代替版本号bin/hadoop jar hadoop-examples-*.jar wordcount hdfsInput hdfsOutput应该出现下面结果:Hadoop命令会启动一个JVM来运行这个MapReduce程序,并自动获得Hadoop的配置,同时把类的路径(及其依赖关系)加入到Hadoop的库中。
以上就是Hadoop Job的运行记录,从这里可以看到,这个Job被赋予了一个ID号:job_201202292213_0002,而且得知输入文件有两个(Total input paths to process : 2),同时还可以了解map的输入输出记录(record数及字节数),以及reduce输入输出记录。
查看HDFS上hdfsOutput目录内容:在终端输入下面指令:bin/hadoop fs -ls hdfsOutput从上图中知道生成了三个文件,我们的结果在"part-r-00000"中。
使用下面指令查看结果输出文件内容bin/hadoop fs -cat output/part-r-00000
转载请注明出处51数据库 » storm wordcount 运行