2
回答
关于mapreduce生成hfile文件
华为云4核8G,高性能云服务器,免费试用   
我用mapreduce处理数据然后入库到hbase当中,由于put比较慢,所以采用生成hfile文件的方式入库,在生成hfile文件大小的时候总是报一下错误:

java.io.IOException: Added a key not lexically larger than previous key=\x00\x02Mi\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04, lastkey=\x00\x01w\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04
        at org.apache.hadoop.hbase.io.hfile.AbstractHFileWriter.checkKey(AbstractHFileWriter.java:203)
        at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:328)
        at org.apache.hadoop.hbase.io.hfile.HFileWriterV2.append(HFileWriterV2.java:293)
        at org.apache.hadoop.hbase.regionserver.StoreFile$Writer.append(StoreFile.java:962)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:167)
        at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat$1.write(HFileOutputFormat.java:123)
        at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
        at org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
        at com.ciwong.test.BuildTest$BuildIndexAfterReduce.reduce(BuildTest.java:110)
        at com.ciwong.test.BuildTest$BuildIndexAfterReduce.reduce(BuildTest.java:1)
        at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
        at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
        at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
        at org.apache.hadoop.mapred.Child.main(Child.java:249)


我的代码是:

                        
Configuration conf2 = new Configuration();
                        conf2.set("mapred.input.dir", args[2]);
                        conf2.set("mapred.output.dir", args[3]);
                        Job jobAfter = new Job(conf2);
                        jobAfter.setJarByClass(BuildTest.class);
                        jobAfter.setMapperClass(BuildIndexAfterMap.class);
                        jobAfter.setReducerClass(BuildIndexAfterReduce.class);
                        jobAfter.setNumReduceTasks(1);
                        jobAfter.setPartitionerClass(SimpleTotalOrderPartitioner.class);
                        jobAfter.setMapOutputKeyClass(ImmutableBytesWritable.class);
                        jobAfter.setMapOutputValueClass(Text.class);
                        jobAfter.setSortComparatorClass(ByteArrayComparator.class);
                        jobAfter.setGroupingComparatorClass(ByteArrayComparator.class);
                        FileOutputFormat.setOutputPath(jobAfter, new Path(args[3]));
                        jobAfter.setOutputFormatClass(HFileOutputFormat.class);
                        jobAfter.setInputFormatClass(TextInputFormat.class);
                        HTable table=new HTable(conf,conf.get("tableName"));
                        HFileOutputFormat.configureIncrementalLoad(jobAfter, table);
                        jobAfter.waitForCompletion(true);


map代码:
   
                      
public static class BuildIndexAfterMap extends                        Mapper<LongWritable, Text, ImmutableBytesWritable, Text> {

                @Override
                public void map(LongWritable key, Text value, Context output)
                                throws IOException, InterruptedException {
                        String outKey = value.toString().split("\t")[0];
                        String outValue = value.toString().split("\t")[1];
                        if(!outKey.trim().isEmpty()){
                                output.write(new ImmutableBytesWritable(Bytes.toBytes(outKey)), new Text(outValue));
                        }
                }
        }


reduce代码:
                      
public static class BuildIndexAfterReduce extends Reducer<ImmutableBytesWritable, Text, ImmutableBytesWritable,KeyValue> {                @Override
                public void reduce(ImmutableBytesWritable key, Iterable<Text> values, Context output)
                                throws IOException, InterruptedException {
                        String outValue = "";
                        Iterator<Text> iterator = values.iterator();
                        while (iterator.hasNext()) {
                                String v = iterator.next().toString();
                                outValue = v + Constants.FIELD_SEPERATOR + outValue;
                        }
                         KeyValue column=new KeyValue(key.get(),
                         Bytes.toBytes("searchIndex"), Bytes.toBytes("userId"),
                         Bytes.toBytes(outValue));
                        output.write(key,column);
                }
        }


请各位大侠帮忙看看是肿么回事,不胜感激
举报
夺命烈火
发帖于5年前 2回/2K+阅
共有2个答案 最后回答: 5年前

真是高级的东西, 一直在玩还没有正式的排上用场.

根据异常的提示:

Added a key not lexically larger than previous key=\x00\x02Mi\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04, lastkey=\x00\x01w\x0BsearchIndexuserId\x00\x00\x01>\xD5\xD6\xF3\xA3\x04

我估计数据中出现了相同的Key了. 就像主键重复一样.

按说在Reduce中不应该存在相同的Key.

但是有种情况,比如:

在Hadoop中的MapReduce中, "key"和"  key"是两个key,可能到了HBaseAPI HFileOutputFormat中可能进行了trim, 因此两个就成一样的了.

--- 共有 5 条评论 ---
夺命烈火回复 @震秦 : 我是用hbase来生成索引的,目前还在开发环境,数据量不是很大,等下一阶段用上线上数据再看,我们是把信息隐藏在rowkey里面的,暂时还没有通过family来查询 5年前 回复
震秦回复 @yjydmlh : 你搞HBase数据量有多大? 向你问一下,在大数据量(1000W+)的情况下,通过Family扫描的那种查询快不? 就是类似数据库索引的那种? 5年前 回复
夺命烈火回复 @震秦 : 很多坑 5年前 回复
震秦回复 @yjydmlh : 看来你已经玩的不错了. 感觉怎么样? 呵呵... 5年前 回复
夺命烈火从map到reduce的时候有一个shuffle的过程,这个过程会把map的输出合并排序并把有相同key的value放在一个迭代器里,所以,不会有相同的key出现,那个错误是说key的排序不对,新加入的key比已经存在的key大,hbase的rowkey是强制升序排序的 5年前 回复
顶部