API,目前Hadoop已经到了1.0版本,但是网上所有MapReduce教程还是使用的旧版MapReduce API,因此决定研究一下新版API。
首先是准备一下用于MapReduce的源文件,如下所示:
1900,35.3
1900,33.2
....
1905,38.2
1905,37.1
如上所示,记录的是每个年份的温度值,现在要求出每个年份最高的温度值,这是一个典型的MapReduce可以很好处理的问题,在Map阶段,得出[1900,(35.3,
333.2,...)],....[1905, (38.2, 37.1, ....)],然后再通过Reduce阶段求出每个年份最高温度值。
首先是写出MapReduce类,这和旧版API比较类似,但是需要注意的是,这里引用的新包:org.apache.hadoop.mapreduce.*而不是原来的org.apache.hadoop.mapred.*,具体程序如下所示:
package com.bjcic.hadoop.guide;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTptr {
public static class MaxTptrMapper extends Mapper
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String[] items = value.toString().split(",");
context.write(new Text(items[0]), new
DoubleWritable(Double.parseDouble(items[1])));
}
}
public static class MaxTptrReducer extends Reducer
@Override
public void reduce(Text key, Iterable
context)
throws IOException, InterruptedException {
double maxTptr = Double.MIN_VALUE;
for (DoubleWritable val : values) {
maxTptr = Math.max(val.get(), maxTptr);
}
context.write(key, new DoubleWritable(maxTptr));
}
}
public static void main(String[] argv) {
//JobConf conf = new JobConf(MaxTptr.class);
Job job = null;
try {
job = new Job();
job.setJarByClass(MaxTptr.class);
FileInputFormat.addInputPath(job, new Path("input"));
FileOutputFormat.setOutputPath(job, new Path("output"));
job.setMapperClass(MaxTptrMapper.class);
job.setReducerClass(MaxTptrReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(DoubleWritable.class);
System.exit( job.waitForCompletion(true) ? 0 : 1 );
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ClassNotFoundException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
对上面的代码有以下点需要说明:
扩展基类发生了变化,分别为Mapper和Reducer
引入了重要的Context类
提交并运行任务采用的是新类Job而不是旧版本的JobConf
将该源文件连同src目录上传到伪分布式部署的Hadoop机器上,首先需要编译并打包Hadoop任务:
javac -classpath $HADOOP_HOME/hadoop-core-1.0.1.jar -d classes
src/com/bjcic/hadoop/guide/MaxTptr.java
jar -cvf ./MaxTptr.jar -C classes/ .
将在当前目录生成MaxTptr.jar。
转到$HADOOP_HOME目录,首先确保Hadoop已经运行在伪分布式模式下,如果没有则启动Hadoop(bin/start-all.sh,注意如果启动不成功,可以先运行bin/hadoop
namenode -format)。
上传温度文件到Hadoop的HDFS文件系统中:
$>bin/hadoop dfs -put /data_dir/tempreture.txt input
运行Hadoop任务:
$>bin/hadoop jar MaxTptr.jar com.bjcic.hadoop.guide.MaxTptr input
output
从HDFS中取下输出结果:
$>bin/hadoop dfs -get output /data_dir/
此时会在data_dir目录下建立output目录,其中就有生成的结果文件。
你看日志的第三行,你的job的jobid是job_local_0001说明你的job是在本地运行的,并不是在分布式环境下,但是你的url是hdfs://master:9000/说明你是在hdfs上创建文件。这个问题说明当job运行在local环境下时不能操作hdfs。
解决方法:把你的项目打包成jar文件,提交集群运行即可,或者把uri的scheme改成file:///也可以,但是此时文件是创建在本地