spark streaming中建立线程池

木木木yanyanyan 发布于 2017/11/13 23:09
阅读 259
收藏 1

为了避免延迟以及更快速的处理业务,在spark streaming中建立线程池,避免每条信息等待处理,代码大致如下
stream.foreachRDD(rdd=> {
      rdd.foreachPartition { rddPartition => {
        val client: Client = ESClient.getInstance.getClient
        var num = Random.nextInt()
        val threadPool: ExecutorService = Executors.newFixedThreadPool(5)
        val confs = new Configuration()
        rddPartition.foreach(x => {
          threadPool.execute(new esThread(x._2, num, client, confs))
        }        )      }     }    }    )

其中线程程序esThread主要对ES进行查询访问,然后把访问后的结果存入hdfs中,大概每个partition中平均有5条数据,所以线程池中设置为5,但是最后的hdfs中的文件的处理数据比实际少了很多,感觉有很多线程都没有执行,这是什么原因呢?

求大神分析指导一下,还有能否在spark streaming建立线程池?线程数的设定和vcore数量有关吗?

 

加载中
返回顶部
顶部