hadoop程序运行异常Unable to initialize any output collector

醉灬清风 发布于 2016/03/24 10:42
阅读 1K+
收藏 0

mac上开发mr程序异常,小弟第一次写这程序,实在是找不出错误,寻求大神看下哪有问题,拜谢

Hadoop版本是2.6.0,下面是代码

主类:


import java.io.IOException;


import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


public class TelecomLogAnalyse {

public static class AnalyseMap extends Mapper<IntWritable, Text, Text, FlowBean>{

@Override

protected void map(IntWritable key, Text value, Mapper<IntWritable,Text, Text, FlowBean>.Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

String phoneNo = value.toString().split("/t")[0];

String upFlow = value.toString().split("/t")[7];

String downFlow = value.toString().split("/t")[8];

FlowBean fb = new FlowBean();

fb.setPhoneNo(new Text(""));

fb.setUpFlow(new IntWritable(Integer.parseInt(upFlow)));

fb.setDownFlow(new IntWritable(Integer.parseInt(downFlow)));

context.write(new Text(phoneNo), fb);

}

}

public static class AnalyseReduce extends Reducer<Text, FlowBean, Text, FlowBean>{

@Override

protected void reduce(Text key, Iterable<FlowBean> arg1, Reducer<Text, FlowBean, Text, FlowBean>.Context context)

throws IOException, InterruptedException {

// TODO Auto-generated method stub

int u = 0;

int d = 0;

for(FlowBean f:arg1){

u+=f.getUpFlow().get();

d+=f.getDownFlow().get();

}

FlowBean fb= new FlowBean();

fb.setPhoneNo(new Text());

fb.setUpFlow(new IntWritable(u));

fb.setDownFlow(new IntWritable(d));

context.write(key, fb);

}

}

public static void main(String[] args) throws Exception {

Configuration conf = new Configuration();

Job job = new Job(conf,"telecomloganalyse");

job.setJarByClass(TelecomLogAnalyse.class);

job.setMapperClass(AnalyseMap.class);

job.setReducerClass(AnalyseReduce.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(FlowBean.class);

FileInputFormat.addInputPath(job, new Path("/Users/SeanYin/Desktop/input1"));

FileOutputFormat.setOutputPath(job, new Path("/Users/SeanYin/Desktop/output1"));

System.exit(job.waitForCompletion(true)?0:1);

}

}

自定义的bean:


import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;


public class FlowBean {

private Text phoneNo;

private IntWritable upFlow;

private IntWritable downFlow;


public FlowBean() {


}


public FlowBean(Text phoneNo, IntWritable upFlow, IntWritable downFlow) {

this.phoneNo = phoneNo;

this.upFlow = upFlow;

this.downFlow = downFlow;

}


public Text getPhoneNo() {

return phoneNo;

}

public void setPhoneNo(Text phoneNo) {

this.phoneNo = phoneNo;

}

public IntWritable getUpFlow() {

return upFlow;

}

public void setUpFlow(IntWritable upFlow) {

this.upFlow = upFlow;

}

public IntWritable getDownFlow() {

return downFlow;

}

public void setDownFlow(IntWritable downFlow) {

this.downFlow = downFlow;

}

@Override

public String toString() {

// TODO Auto-generated method stub

return phoneNo.toString()+"/t"+upFlow.toString()+"/t"+downFlow.toString();

}



}

下面是报错信息:


2016-03-24 10:16:13,652 WARN  [LocalJobRunner Map Task Executor #0] mapred.MapTask (MapTask.java:createSortingCollector(409)) - Unable to initialize MapOutputCollector org.apache.hadoop.mapred.MapTask$MapOutputBuffer

java.lang.NullPointerException

at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.init(MapTask.java:1008)

at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:401)

at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)

at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)

at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)

at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

2016-03-24 10:16:13,653 INFO  [Thread-13] mapred.LocalJobRunner (LocalJobRunner.java:runTasks(456)) - map task executor complete.

2016-03-24 10:16:13,654 WARN  [Thread-13] mapred.LocalJobRunner (LocalJobRunner.java:run(560)) - job_local904980030_0001

java.lang.Exception: java.io.IOException: Unable to initialize any output collector

at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)

at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:522)

Caused by: java.io.IOException: Unable to initialize any output collector

at org.apache.hadoop.mapred.MapTask.createSortingCollector(MapTask.java:412)

at org.apache.hadoop.mapred.MapTask.access$100(MapTask.java:81)

at org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:695)

at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:767)

at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)

at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)

at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)

at java.util.concurrent.FutureTask.run(FutureTask.java:262)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)

at java.lang.Thread.run(Thread.java:745)

加载中
返回顶部
顶部