hadoop自定义输入格式后无法执行map函数

gongshaojie 发布于 2014/12/05 14:32
阅读 657
收藏 0

首先上代码:这几段代码主要实在hadoop上使用lucene进行中文分词

1,自定义MyInputFormat类:

package MyInput;


import java.io.IOException;


import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.CombineFileRecordReader;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;


public class MyInputFormat  extends CombineFileInputFormat<Text, Text>{

@Override
public boolean isSplitable(JobContext context, Path file) {
return false;
}




@Override
public RecordReader<Text, Text> createRecordReader(InputSplit split,
TaskAttemptContext context) throws IOException {
CombineFileRecordReader<Text, Text> recordreader=
new CombineFileRecordReader<Text, Text>((CombineFileSplit)split, context, MyRecordReader.class);
return recordreader;
}
}


2,自定义的RecordReader类

package MyInput;


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.CombineFileSplit;


public class MyRecordReader extends RecordReader<Text, Text> {

private CombineFileSplit combineFileSplit;        //当前处理的分片
private int totalLength;                          //分片包含的文件数量
private int currentIndex;  //当前处理的文件索引
private float currentProgress=0;                  //当前处理文件的进度
private Text currentKey=new Text();               //当前的key
private Text currentValue=new Text();             //当前的value
private Configuration conf;                       //任务信息
private boolean processed;                        //记录当前文件是否被读取


/**
* 构造函数:带处理的split,任务信息和系统信息context, 当前文件在Split中的索引
*/
public MyRecordReader(CombineFileSplit split,TaskAttemptContext context,Integer index){
super();
this.currentIndex=index;
this.combineFileSplit=split;
conf=context.getConfiguration();
totalLength=combineFileSplit.getPaths().length;
processed=false;

}


//关闭reader前的清理工作
@Override
public void close() throws IOException {

}


@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return currentKey;
}


@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return currentValue;
}


//计算当前处理的进度=当前处理文件的索引/分片中文件的总数量
@Override
public float getProgress() throws IOException, InterruptedException {
if(currentIndex>0&&currentIndex<totalLength){
currentProgress=(float)currentIndex/totalLength;
return currentProgress;
}
return currentProgress;
}


@Override
public void initialize(InputSplit arg0, TaskAttemptContext arg1)
throws IOException, InterruptedException {

}


@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if(!processed){   //如果文件未处理就读取文件并设置key-value
Path file=combineFileSplit.getPath(currentIndex);
//设置key
currentKey.set(file.getParent().getName());
//设置value
FSDataInputStream in=null;
byte[] contents=new byte[(int)combineFileSplit.getLength(currentIndex)];
try {
FileSystem fs=file.getFileSystem(conf);
in=fs.open(file);
in.readFully(contents);
currentValue.set(contents);
} catch (Exception e) {
e.printStackTrace();
}finally{
in.close();
}
processed=true;
return true;
}
return false;//如果文件已经处理,必须返回false
}


}


3,mapreduce主类:

package tokenize;


import java.io.IOException;
import java.io.StringReader;
import java.util.Map;


import net.paoding.analysis.analyzer.PaodingAnalyzer;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.tools.GetConf;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.lucene.analysis.TokenStream;
import org.apache.lucene.analysis.tokenattributes.CharTermAttribute;
















import MyInput.MyInputFormat;
import util.Recommend;
import util.hdfsDao;


public class TokenizerDriver  {
public static class TokenizerMapper extends Mapper<Text, Text, Text, Text>{
//定义输入输出的格式
private static Text outKey=new Text();
private static Text outValue=new Text();


PaodingAnalyzer analyzer=new PaodingAnalyzer();


@Override
public void map(Text key, Text value,
Mapper<Text, Text, Text, Text>.Context context) throws IOException,
InterruptedException {
System.out.println("分词程序开始。。。。。。。。。。。。。。。。");
outKey.set(key);
String line=value.toString();
StringReader sr=new StringReader(line);
TokenStream ts=analyzer.tokenStream("", sr);
StringBuilder sb=new StringBuilder();
try {
while(ts.incrementToken()){
CharTermAttribute cta=ts.getAttribute(CharTermAttribute.class);
sb.append(cta.toString());
sb.append(" ");

}
} catch (Exception e) {
context.getCounter(Counter.FAILDOCS).increment(1);
e.printStackTrace();
}
//设置value的值
outValue.set(sb.toString());
context.write(outKey, outValue);
}
}

public static void run(Map<String, String> path) throws IOException, ClassNotFoundException, InterruptedException{
Configuration conf=new Configuration();
conf.setLong("mapreduce.input.fileinputformat.split.maxsize", 4000000);
String inputPath=path.get("input");
String outputPath=path.get("output");
//获得需要分词的数据所在的路径
// String dataPaht=path.get("data");
hdfsDao dao=new hdfsDao(Recommend.HDFS,conf);
//判断hdfs中输入输出路径是否存在
//创建目录
//       dao.rmdirs(inputPath);
//       dao.mkdirs(inputPath);
dao.rmdirs(outputPath);
//把训练数据从本地拷贝到中去
//dao.copyFile(dataPaht, inputPath);
Job job=Job.getInstance(conf, "lucene");
job.setJarByClass(TokenizerDriver.class);
job.setInputFormatClass(MyInputFormat.class);
job.setMapperClass(TokenizerMapper.class);
// job.setMapOutputKeyClass(Text.class);
// job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//输入路径转化为path
Path input=new Path(inputPath);
Path output=new Path(outputPath);
//由于输入文件放在不同的文件夹中,所以每个文件夹都要进行添加
try {
FileSystem fs=input.getFileSystem(conf);
FileStatus[] stats=fs.listStatus(input);
for (FileStatus fileStatus : stats) {
FileInputFormat.addInputPath(job, fileStatus.getPath());
}
} catch (Exception e) {
e.printStackTrace();
return;
}
FileOutputFormat.setOutputPath(job, output);
//删除输出文件
try {
FileSystem hdfs=output.getFileSystem(conf);
if(hdfs.exists(output))
hdfs.deleteOnExit(output);
hdfs.close();
} catch (Exception e) {
e.printStackTrace();
return;
}
System.exit(job.waitForCompletion(true)?0:1);
}
}


运行结果如下:

log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
删除目录:hdfs://hdm:9000/lucene/result/


没有执行map函数

加载中
返回顶部
顶部