1) 创建Map(可以是任何名字)类和map函数,map函数是在org.apache.hadoop.mapreduce.Mapper.class类中,以抽象方法定义的。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class Map extends Mapper<LongWritable, Text, Text,IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(LongWritable key,Text value,Context context) throws IOException, InterruptedException { word.set(value.toString()); context.write(word, one); } }
解释:
Mapper类是一个泛型类,带有4个参数(输入的键,输入的值,输出的键,输出的值)。在这里输入的键为LongWritable(hadoop中的Long类型),输入的值为Text(hadoop中的String类型),输出的键为Text(关键字)和输出的值为Intwritable(hadoop中的int类型)。以上所有hadoop数据类型和java的数据类型都很相像,除了它们是针对网络序列化而做的特殊优化。
2) 创建Reducer(任何名字)类和reduce函数,reduce函数是在org.apache.hadoop.mapreduce.Reducer.class类中,以抽象方法定义的。
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; import java.util.Iterator; public class Reduce extends Reducer<Text, IntWritable, Text,IntWritable> { @Override protected void reduce(Text key, Iterable values, Context context) throws IOException, InterruptedException { int sum = 0; for(IntWritable intWritable : values){ sum += intWritable.get(); } context.write(key, new IntWritable(sum)); } }
解释:
Reducer类是一个泛型类,带有4个参数(输入的键,输入的值,输出的键,输出的值)。在这里输入的键和输入的值必须跟Mapper的输出的类型相匹配,输出的键是Text(关键字),输出的值是Intwritable(出现的次数)
3)我们已经准备号了Map和Reduce的实现类,现在我们需要invoker来配置Hadoop任务,调用Map Reduce程序。
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount{ public static void main(String[] args) throws Exception { Configuration configuration = new Configuration(); configuration.set("fs.default.name", "hdfs://localhost:10011"); configuration.set("mapred.job.tracker","localhost:10012"); Job job = new Job(configuration, "Word Count"); job.setJarByClass(WordCount.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(org.apache.hadoop.mapreduce.lib.input.TextInputFormat.class); job.setOutputFormatClass(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); //Submit the job to the cluster and wait for it to finish. System.exit(job.waitForCompletion(true) ? 0 : 1); } }4)编译代码:
mkdir WordCount javac -classpath ${HADOOP_HOME}/hadoop-0.20.2+228-core.jar -d WordCount path/*.java5)创建jar包
jar -cvf ~/WordCount.jar -C WordCount/ .6)在本地文件系统中创建输入文件
例如:mkdir /home/user1/wordcount/input
cd /wordcount/input gedit file01 gedit file027)复制本地的输入文件到HDFS
$HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file01 /home/user1/dfs/input/file01 $HADOOP_HOME/bin/hadoop fs -cp ~/wordcount/input/file02 /home/user1/dfs/input/file028) 执行jar包
$HADOOP_HOME/bin/hadoop jar WordCount.jar WordCount /home/user1/dfs/input /home/user1/dfs/output9)执行完毕后,以下的命令是用于查看reduce的输出文件
$HADOOP_HOME/bin/hadoop fs -ls /home/user1/dfs/output/10)使用如下命令来查看文件:
$HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00000 $HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00001 $HADOOP_HOME/bin/hadoop fs -cat hdfs:///home/user1/dfs/output/part-00002接下来的文章:在Java Hadoop MapReduce中使用Distributed Cache
评论删除后,数据将无法恢复
评论(2)