Hive UDF函数报错UDFArgumentException: The UDF implementation class 'xxxxx' is not present inthe class path

卓越的明天 发布于 2017/07/13 21:04
阅读 880
收藏 0

这两天遇到了一个比较诡异的问题,就是编辑Hive的UDF函数,并提交需要执行MapReduce的SQL后,提示报错
org.apache.hadoop.hive.ql.exec.UDFArgumentException: The UDF implementation class 'xxxxx' is not present inthe class path
具体代码如下


```
 package com.mzm.transformer.hive;

import com.mzm.common.GlobalConstants;
import com.mzm.utils.JdbcManager;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.UDF;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.LinkedHashMap;
import java.util.Map;

/**
 * 获取订单信息的UDF
 * Created by Administrator on 2017/7/12.
 */
public class OrderInfoUDF extends UDF {
    //数据库连接
    private Connection conn = null;
    //缓存
    private Map<String, InnerOrderInfo> cache = new LinkedHashMap<String, InnerOrderInfo>() {
        @Override
        protected boolean removeEldestEntry(Map.Entry<String, InnerOrderInfo> eldest) {
            return cache.size() > 100;
        }
    };

    public OrderInfoUDF() {
        Configuration conf = new Configuration();
        conf.addResource("transformer-env.xml");
        try {
            conn = JdbcManager.getConnection(conf, GlobalConstants.WAREHOUSE_OF_REPORT);
        } catch (SQLException e) {
            throw new RuntimeException("创建MySQL连接异常", e);
        }

        //添加一个钩子进行关闭操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                JdbcManager.close(conn, null, null);
            }
        }));
    }

    /**
     * 根据订单ID和标志位,获取对应的订单值
     *
     * @param orderId
     * @param flag
     * @return
     */
    public Text evaluate(Text orderId, Text flag) {
        if (orderId == null || flag == null || StringUtils.isBlank(orderId.toString().trim()) ||
                StringUtils.isBlank(flag.toString().trim())) {
            throw new IllegalArgumentException("参数异常,订单id不能为空");
        }

        String order = orderId.toString();
        InnerOrderInfo info = fetchInnerOrderInfo(order);
        Text defaultValue = new Text(GlobalConstants.DEFAULT_VALUE);
        String str = flag.toString();
        if ("pl".equals(str)) {
            return info == null || StringUtils.isBlank(info.getPlatform()) ? defaultValue : new Text
                    (info.getPlatform());
        }
        if ("cut".equals(str)) {
            return info == null || StringUtils.isBlank(info.getCurrencyType()) ? defaultValue : new
                    Text(info.getCurrencyType());
        }
        if ("pt".equals(str)) {
            return info == null || StringUtils.isBlank(info.getPaymentType()) ? defaultValue : new Text
                    (info.getPaymentType());
        }
        throw new IllegalArgumentException("参数异常flag必须为(pl,cut,pt)中的一个,给定的是:" + flag);

    }

    /**
     * 根据订单ID,获取订单金额
     *
     * @param orderId
     * @return
     */
    public IntWritable evaluate(Text orderId) {
        if (orderId == null || StringUtils.isBlank(orderId.toString().trim())) {
            throw new IllegalArgumentException("参数异常,订单id不能为空");
        }

        String order = orderId.toString();

        InnerOrderInfo info = fetchInnerOrderInfo(order);
        return info == null ? new IntWritable(0) : new IntWritable(info.getAmount());
    }

    /**
     * 根据订单ID,获取订单的信息
     *
     * @param orderId
     * @return
     */
    private InnerOrderInfo fetchInnerOrderInfo(String orderId) {
        InnerOrderInfo info = cache.get(orderId);
        if (info != null) {
            return info;
        }
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        info = new InnerOrderInfo();

        try {
            pstmt = conn.prepareStatement("select order_id,platform,s_time,currency_type,payment_type," +
                    "amount from order_info where order_id=?");
            int i = 0;
            pstmt.setString(++i, orderId.toString().trim());
            rs = pstmt.executeQuery();
            if (rs.next()) {
                info.setOrderId(rs.getString("order_id"));
                info.setPlatform(rs.getString("platform"));
                info.setCurrencyType(rs.getString("currency_type"));
                info.setPaymentType(rs.getString("payment_type"));
                info.setsTime(rs.getLong("s_time"));
                info.setAmount(rs.getInt("amount"));
            }
            return info;
        } catch (SQLException e) {
            throw new RuntimeException("查询数据库时发生异常", e);
        } finally {
            JdbcManager.close(null, pstmt, rs);
        }
    }

    /**
     * 内部类
     */
    private static class InnerOrderInfo {
        private String OrderId;
        private String currencyType;
        private String paymentType;
        private String platform;
        private long sTime;
        private int amount;

        public InnerOrderInfo() {
        }

        public InnerOrderInfo(String orderId, String currencyType, String paymentType, String platform,
                              long sTime, int amount) {
            OrderId = orderId;
            this.currencyType = currencyType;
            this.paymentType = paymentType;
            this.platform = platform;
            this.sTime = sTime;
            this.amount = amount;
        }

        public String getOrderId() {
            return OrderId;
        }

        public void setOrderId(String orderId) {
            OrderId = orderId;
        }

        public String getCurrencyType() {
            return currencyType;
        }

        public void setCurrencyType(String currencyType) {
            this.currencyType = currencyType;
        }

        public String getPaymentType() {
            return paymentType;
        }

        public void setPaymentType(String paymentType) {
            this.paymentType = paymentType;
        }

        public String getPlatform() {
            return platform;
        }

        public void setPlatform(String platform) {
            this.platform = platform;
        }

        public long getsTime() {
            return sTime;
        }

        public void setsTime(long sTime) {
            this.sTime = sTime;
        }

        public int getAmount() {
            return amount;
        }

        public void setAmount(int amount) {
            this.amount = amount;
        }
    }
}

```
具体报错信息如下:


```
 hive (default)> select order_information(oid,'pl') as pl,from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd') as date,order_information(oid,'cut') as cut,order_information(oid,'pt') as pt,count(distinct oid) as orders from event_logs where en='e_cs' and pl is not null and s_time>=unix_timestamp('2017-07-05','yyyy-MM-dd')*1000 and s_time<unix_timestamp('2017-07-06','yyyy-MM-dd')*1000 group by order_information(oid,'pl'),from_unixtime(cast(s_time/1000 as bigint),'yyyy-MM-dd'),order_information(oid,'cut'),order_information(oid,'pt');

Diagnostic Messages for this Task:
Error: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:446)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:343)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:168)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1642)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:163)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 9 more
Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:75)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.mapred.MapRunner.configure(MapRunner.java:38)
    ... 14 more
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:106)
    ... 17 more
Caused by: java.lang.RuntimeException: Map operator initialization failed
    at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.configure(ExecMapper.java:157)
    ... 22 more
Caused by: org.apache.hadoop.hive.ql.exec.UDFArgumentException: The UDF implementation class 'com.mzm.transformer.hive.OrderInfoUDF' is not present in the class path
    at org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.initialize(GenericUDFBridge.java:143)
    at org.apache.hadoop.hive.ql.udf.generic.GenericUDF.initializeAndFoldConstants(GenericUDF.java:116)
    at org.apache.hadoop.hive.ql.exec.ExprNodeGenericFuncEvaluator.initialize(ExprNodeGenericFuncEvaluator.java:127)
    at org.apache.hadoop.hive.ql.exec.GroupByOperator.initializeOp(GroupByOperator.java:216)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:460)
    at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:416)
    at org.apache.hadoop.hive.ql.exec.SelectOperator.initializeOp(SelectOperator.java:65)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:460)
    at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:416)
    at org.apache.hadoop.hive.ql.exec.FilterOperator.initializeOp(FilterOperator.java:83)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:460)
    at org.apache.hadoop.hive.ql.exec.Operator.initializeChildren(Operator.java:416)
    at org.apache.hadoop.hive.ql.exec.TableScanOperator.initializeOp(TableScanOperator.java:189)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
    at org.apache.hadoop.hive.ql.exec.MapOperator.initializeOp(MapOperator.java:427)
    at org.apache.hadoop.hive.ql.exec.Operator.initialize(Operator.java:376)
    at org.apache.hadoop.hive.ql.exec.mr.ExecMapper.configure(ExecMapper.java:136)
    ... 22 more


FAILED: Execution Error, return code 2 from org.apache.hadoop.hive.ql.exec.mr.MapRedTask
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   HDFS Read: 0 HDFS Write: 0 FAIL
Total MapReduce CPU Time Spent: 0 msec

```
jar包也打了好几次了,都是报一样的错误,但是只有这个UDF函数报错,其他的UDF函数就没有问题,能正常执行HQL(无论是否需要MapReduce),甚至是使用这个UDF函数执行诸如select order_info(oid) from event_logs这类无需MapReduce的HQL,都能正常执行,但就执行是需要MapReduce的HQL报错。

被这个问题给折腾死了,求各位大神帮忙看看

加载中
0
黄飞鸿大大
黄飞鸿大大

从hadoop  lib目录下拷贝hadoop-lzo.jar到hive lib下

返回顶部
顶部