使用idea打包spark jar后运行出错

rilweic 发布于 2016/04/12 10:53
阅读 2K+
收藏 0

在idea上使用scala插件,创建scala-maven工程成功运行,打jar包出错

代码:


package com.chanct.idap.ssm.spark


import com.chanct.idap.ssm.common.{HbaseUtils, PlatformConfig}
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.util.Bytes
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}


/**
  * Created by lichao on 16-4-4.
  *
  */
object KafKa2SparkStreaming2Hbase {
   def main(args: Array[String]) {
     val zkQuorum = "c1d8:2181,c1d9:2181,c1d10:2181"
     val group = "1"
     val topics = "scala_api_topic"
     val numThreads = 2


//     val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").setMaster("local[2]").set("spark.eventLog.overwrite","true")
     val sparkConf = new SparkConf().setAppName("KafkaWordCount2Hbase").set("spark.eventLog.overwrite","true")
     val ssc = new StreamingContext(sparkConf, Seconds(5))
     ssc.checkpoint("checkpoint")


     val topicpMap = topics.split(",").map((_, numThreads.toInt)).toMap
     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap).map(_._2)
     val words = lines.flatMap(_.split(" "))




     val pairs = words.map(word => (word, 1))


     val wordCounts = pairs.reduceByKey(_ + _)






     wordCounts.print()


     wordCounts.foreachRDD {
       rdd => rdd.foreachPartition {


         partition =>
           val conf = PlatformConfig.loadHbaseConf() //加载hbase配置文件
           val conn = HbaseUtils.getConnection(conf) //获取hbase连接


           val userTable = TableName.valueOf("WCTest")
           val table = conn.getTable(userTable)
           partition.foreach {
             w =>
               try {
                 val put = new Put(Bytes.toBytes(System.currentTimeMillis().toString))
                 put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("value"), Bytes.toBytes(w._1.toString))
                 put.addColumn(Bytes.toBytes("A"), Bytes.toBytes("count"), Bytes.toBytes(w._2.toString))
                 table.put(put)


               } catch {
                 case _: Exception => println("raw error!")
               }
           }
           table.close()
           conn.close()


       }
     }
     wordCounts.print()


     ssc.start()
     ssc.awaitTermination()
   }
 }
	

 

 
 



pom文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.chanct.idap</groupId>
    <artifactId>ssm</artifactId>
    <packaging>jar</packaging>
    <version>1.0-SNAPSHOT</version>
    <inceptionYear>2016</inceptionYear>
    <properties>
        <scala.version>2.10.4</scala.version>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <!-- <repositories>
         <repository>
             <id>scala-tools.org</id>
             <name>Scala-Tools Maven2 Repository</name>
             <url>http://scala-tools.org/repo-releases</url>
         </repository>
     </repositories>

     <pluginRepositories>
         <pluginRepository>
             <id>scala-tools.org</id>
             <name>Scala-Tools Maven2 Repository</name>
             <url>http://scala-tools.org/repo-releases</url>
         </pluginRepository>
     </pluginRepositories>-->

    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0-cdh5.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet.jsp</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-core</artifactId>
            <version>2.6.0-mr1-cdh5.4.5</version>
            <exclusions>
                <exclusion>
                    <groupId>javax.servlet.jsp</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
                <exclusion>
                    <artifactId>servlet-api</artifactId>
                    <groupId>javax.servlet</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase</artifactId>
            <version>1.0.0-cdh5.4.5</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-server</artifactId>
            <version>1.0.0-cdh5.4.5</version>
            <exclusions>
                <exclusion>
                    <artifactId>servlet-api-2.5</artifactId>
                    <groupId>org.mortbay.jetty</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.4</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.specs</groupId>
            <artifactId>specs</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.10</artifactId>
            <version>0.8.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-assembly_2.10</artifactId>
            <version>1.3.0-cdh5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.0</version>
        </dependency>
        <dependency>
            <groupId>org.codehaus.jettison</groupId>
            <artifactId>jettison</artifactId>
            <version>1.3.7</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-yarn_2.10</artifactId>
            <version>1.3.0-cdh5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.3.0-cdh5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.3.0-cdh5.4.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.8.2.0</version>
        </dependency>
    </dependencies>

    <build>
        <!-- <sourceDirectory>src/main/scala</sourceDirectory>
         <testSourceDirectory>src/test/scala</testSourceDirectory>-->
        <plugins>

            <plugin>
                <groupId>org.scala-tools</groupId>
                <artifactId>maven-scala-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                    <args>
                        <arg>-target:jvm-1.7</arg>
                    </args>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.7</source>
                    <target>1.7</target>
                </configuration>
            </plugin>
            <!--<plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-eclipse-plugin</artifactId>
                <configuration>
                    <downloadSources>true</downloadSources>
                    <buildcommands>
                        <buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
                    </buildcommands>
                    <additionalProjectnatures>
                        <projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
                    </additionalProjectnatures>
                    <classpathContainers>
                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
                        <classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
                    </classpathContainers>
                </configuration>
            </plugin>-->

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <transformers>
                                <transformer
                                        implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                </transformer>
                            </transformers>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>
    </build>
    <!-- <reporting>
         <plugins>
             <plugin>
                 <groupId>org.scala-tools</groupId>
                 <artifactId>maven-scala-plugin</artifactId>
                 <configuration>
                     <scalaVersion>${scala.version}</scalaVersion>
                 </configuration>
             </plugin>
         </plugins>
     </reporting>-->
</project>

maven打包方法:



spark 提交命令:

spark-submit --class com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase --master yarn-cluster --num-executors 3 --driver-memory 1g --executor-memory 512m --executor-cores 1 /home/lc/ssm-1.0-SNAPSHOT.jar



最后的错误:

Exception in thread "Thread-63" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:699)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:698)
	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:698)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1411)
	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:81)
	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1346)
	at org.apache.spark.SparkContext.stop(SparkContext.scala:1386)
	at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$3.run(ApplicationMaster.scala:107)
	at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54)
16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors
16/04/12 10:33:55 INFO cluster.YarnClusterSchedulerBackend: Asking each executor to shut down
16/04/12 10:33:55 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorActor: OutputCommitCoordinator stopped!
16/04/12 10:33:55 INFO spark.MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
16/04/12 10:33:55 INFO storage.MemoryStore: MemoryStore cleared
16/04/12 10:33:55 INFO storage.BlockManager: BlockManager stopped
16/04/12 10:33:55 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
16/04/12 10:33:55 WARN scheduler.EventLoggingListener: Event log hdfs://nameservice1/user/spark/applicationHistory/application_1460352776638_0056 already exists. Overwriting...
16/04/12 10:33:55 INFO Remoting: Remoting shut down
16/04/12 10:33:55 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
16/04/12 10:33:55 INFO spark.SparkContext: Successfully stopped SparkContext
16/04/12 10:33:55 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: User class threw exception: Job aborted due to stage failure: Task 2 in stage 7.0 failed 4 times, most recent failure: Lost task 2.3 in stage 7.0 (TID 84, c1d3): java.lang.NullPointerException
	at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:47)
	at com.chanct.idap.ssm.spark.KafKa2SparkStreaming2Hbase$$anonfun$main$1$$anonfun$apply$1.apply(KafKa2SparkStreaming2Hbase.scala:42)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
	at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:806)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
	at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
	at org.apache.spark.scheduler.Task.run(Task.scala:64)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)

Driver stacktrace:)
16/04/12 10:33:55 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
16/04/12 10:33:55 INFO yarn.ApplicationMaster: Deleting staging directory .sparkStaging/application_1460352776638_0056




加载中
0
rilweic
rilweic

问题解决了,spark与hbase衔接由于版本的问题,会有jar包找不到的问题,在spark配置中加入spark.executor.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar

在spark提交任务的时候:spark-submit --master yarn-cluster --driver-class-path /etc/hbase/conf:/opt/cloudera/parcels/CDH/lib/hbase/lib/htrace-core-3.1.0-incubating.jar ....

http://community.cloudera.com/t5/Apache-Hadoop-Concepts-and/Class-not-found-running-Spark-sample-hbase-importformat-py-in/td-p/27084


终于搞完了,花了一天时间还是很有收获的

OSCHINA
登录后可查看更多优质内容
返回顶部
顶部