解决方法:把你的项目打包成jar文件,提交集群运行即可,或者把uri的scheme改成file:///也可以,但是此时文件是创建在本地
众所周知,从hadoop 0.20.x之后,hadoop引入了新版的mapreduce
api,目前hadoop已经到了1.0版本,但是网上所有mapreduce教程还是使用的旧版mapreduce api,因此决定研究一下新版api。
首先是准备一下用于mapreduce的源文件,如下所示:
1900,35.3
1900,33.2
....
1905,38.2
1905,37.1
如上所示,记录的是每个年份的温度值,现在要求出每个年份最高的温度值,这是一个典型的mapreduce可以很好处理的问题,在map阶段,得出[1900,(35.3,
333.2,...)],....[1905, (38.2, 37.1, ....)],然后再通过reduce阶段求出每个年份最高温度值。
首先是写出mapreduce类,这和旧版api比较类似,但是需要注意的是,这里引用的新包:org.apache.hadoop.mapreduce.*而不是原来的org.apache.hadoop.mapred.*,具体程序如下所示:
package com.bjcic.hadoop.guide;
import java.io.bufferedreader;
import java.io.file;
import java.io.fileinputstream;
import java.io.ioexception;
import java.io.inputstreamreader;
import org.apache.hadoop.fs.path;
import org.apache.hadoop.io.doublewritable;
import org.apache.hadoop.io.longwritable;
import org.apache.hadoop.io.text;
import org.apache.hadoop.mapreduce.job;
import org.apache.hadoop.mapreduce.mapper;
import org.apache.hadoop.mapreduce.reducer;
import org.apache.hadoop.mapreduce.reducer.context;
import org.apache.hadoop.mapreduce.lib.input.fileinputformat;
import org.apache.hadoop.mapreduce.lib.output.fileoutputformat;
public class maxtptr {
public static class maxtptrmapper extends mapper
@override
public void map(longwritable key, text value, context context)
throws ioexception, interruptedexception {
string[] items = value.tostring().split(",");
context.write(new text(items[0]), new
doublewritable(double.parsedouble(items[1])));
}
}
public static class maxtptrreducer extends reducer
@override
public void reduce(text key, iterable
context)
throws ioexception, interruptedexception {
double maxtptr = double.min_value;
for (doublewritable val : values) {
maxtptr = math.max(val.get(), maxtptr);
}
context.write(key, new doublewritable(maxtptr));
}
}
public static void main(string[] argv) {
//jobconf conf = new jobconf(maxtptr.class);
job job = null;
try {
job = new job();
job.setjarbyclass(maxtptr.class);
fileinputformat.addinputpath(job, new path("input"));
fileoutputformat.setoutputpath(job, new path("output"));
job.setmapperclass(maxtptrmapper.class);
job.setreducerclass(maxtptrreducer.class);
job.setoutputkeyclass(text.class);
job.setoutputvalueclass(doublewritable.class);
system.exit( job.waitforcompletion(true) ? 0 : 1 );
} catch (ioexception e) {
// todo auto-generated catch block
e.printstacktrace();
} catch (interruptedexception e) {
// todo auto-generated catch block
e.printstacktrace();
} catch (classnotfoundexception e) {
// todo auto-generated catch block
e.printstacktrace();
}
}
}
对上面的代码有以下点需要说明:
扩展基类发生了变化,分别为mapper和reducer
引入了重要的context类
提交并运行任务采用的是新类job而不是旧版本的jobconf
将该源文件连同src目录上传到伪分布式部署的hadoop机器上,首先需要编译并打包hadoop任务:
javac -classpath $hadoop_home/hadoop-core-1.0.1.jar -d classes
src/com/bjcic/hadoop/guide/maxtptr.java
jar -cvf ./maxtptr.jar -c classes/ .
将在当前目录生成maxtptr.jar。
转到$hadoop_home目录,首先确保hadoop已经运行在伪分布式模式下,如果没有则启动hadoop(bin/start-all.sh,注意如果启动不成功,可以先运行bin/hadoop
namenode -format)。
上传温度文件到hadoop的hdfs文件系统中:
$>bin/hadoop dfs -put /data_dir/tempreture.txt input
运行hadoop任务:
$>bin/hadoop jar maxtptr.jar com.bjcic.hadoop.guide.maxtptr input
output
从hdfs中取下输出结果:
$>bin/hadoop dfs -get output /data_dir/
此时会在data_dir目录下建立output目录,其中就有生成的结果文件。