java远程调用 spark服务出错

flink58 发布于 2015/11/17 16:29
阅读 2K+
收藏 0

 

/**
 * 
 */


import java.util.Arrays;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

/**
 * @author Administrator
 *
 */
public class Ahjt {

	/**
	 * 
	 */
	public Ahjt() {
		// TODO Auto-generated constructor stub
	}

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		// TODO Auto-generated method stub
		//JavaSparkContextsc=...;// An existing JavaSparkContext.SQLContextsqlContext=neworg.apache.spark.sql.SQLContext(sc);

		   SparkConf conf = new SparkConf().setAppName("test").setMaster("spark://192.168.1.251:7077");
	        JavaSparkContext sc = new JavaSparkContext(conf);
	        
	        
	     // sc is an existing JavaSparkContext.
	        SQLContext sqlContext = new org.apache.spark.sql.SQLContext(sc);

	        // A JSON dataset is pointed to by path.
	        // The path can be either a single text file or a directory storing text files.
	        DataFrame people = sqlContext.read().json("/bigdata/spark/examples/src/main/resources/people.json");

	     // Displays the content of the DataFrame to stdout
	        people.show();
	        // The inferred schema can be visualized using the printSchema() method.
	        people.printSchema();
	        // root
	        //  |-- age: integer (nullable = true)
	        //  |-- name: string (nullable = true)

	        // Register this DataFrame as a table.
	        people.registerTempTable("people");
	        
	        
	        
	     // Print the schema in a tree format
	        people.printSchema();
	        // root
	        // |-- age: long (nullable = true)
	        // |-- name: string (nullable = true)

	        // Select only the "name" column
	        people.select("name").show();
	        // name
	        // Michael
	        // Andy
	        // Justin

	        // Select everybody, but increment the age by 1
	        people.select(people.col("name"), people.col("age").plus(1)).show();
	        // name    (age + 1)
	        // Michael null
	        // Andy    31
	        // Justin  20

	        // Select people older than 21
	        people.filter(people.col("age").gt(21)).show();
	        // age name
	        // 30  Andy

	        // Count people by age
	        people.groupBy("age").count().show();
	        // age  count
	        // null 1
	        // 19   1
	        // 30   1
	        
	        
	        

	        // SQL statements can be run by using the sql methods provided by sqlContext.
	        DataFrame teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");

	        // Alternatively, a DataFrame can be created for a JSON dataset represented by
	        // an RDD[String] storing one JSON object per string.
	        List<String> jsonData = Arrays.asList(
	          "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
	        JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
	        DataFrame anotherPeople = sqlContext.read().json(anotherPeopleRDD);
	}

}



Connecting to master spark://192.168.1.251:7077...

15/11/17 16:30:06 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[appclient-registration-retry-thread,5,main]
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@d919109 rejected from java.util.concurrent.ThreadPoolExecutor@7b42ce0a[Running, pool size = 1, active threads = 0, queued tasks = 0, completed tasks = 1]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.reject(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.execute(Unknown Source)
at java.util.concurrent.AbstractExecutorService.submit(Unknown Source)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:96)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1.apply(AppClient.scala:95)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint.tryRegisterAllMasters(AppClient.scala:95)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint.org$apache$spark$deploy$client$AppClient$ClientEndpoint$$registerWithMaster(AppClient.scala:121)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2$$anonfun$run$1.apply$mcV$sp(AppClient.scala:132)
at org.apache.spark.util.Utils$.tryOrExit(Utils.scala:1119)
at org.apache.spark.deploy.client.AppClient$ClientEndpoint$$anon$2.run(AppClient.scala:124)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.runAndReset(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
15/11/17 16:30:06 INFO DiskBlockManager: Shutdown hook called
15/11/17 16:30:06 INFO ShutdownHookManager: Shutdown hook called
15/11/17 16:30:06 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-4ac49489-3094-4ae5-8fd8-8bf5e8f117f2\httpd-2c8cf7f6-7c31-4354-aa86-97cbd0953e4d
15/11/17 16:30:06 INFO ShutdownHookManager: Deleting directory C:\Users\Administrator\AppData\Local\Temp\spark-4ac49489-3094-4ae5-8fd8-8bf5e8f117f2\
加载中
0
flink58
flink58

引用来自“sky_solo”的评论

请问你解决这个问题了吗?


解决了,把Java的 main方法打成个jar包, 放到spark上 ,直接运行结果可以正常出来,远程貌似没有成功过!
Carvendy
Carvendy
你现在可以远程了吗?
0
s
sky_solo

请问你解决这个问题了吗?


0
陈召东
master 的url 用spak webui中的 URL: spark://stresstest161***:7077 (cluster mode) 标识的设置, 直接设置ip就出现这样的问题。
返回顶部
顶部