# 用Spark解决一些经典MapReduce问题

### 最大值最小值

```@Test
def testMaxMin: Unit = {
val sconf = new SparkConf().setAppName("test")
val sc = new SparkContext(sconf) //初始化测试数据 val data = sc.parallelize(Array(10,7,3,4,5,6,7,8,1001,6,2)) //方法一 val res = data.map(x => ("key", x)).groupByKey().map(x => { var min = Integer.MAX_VALUE var max = Integer.MIN_VALUE for(num <- x._2){ if(num>max){
max = num
} if(num<min){
min = num
}
}
(max,min)
}).collect.foreach(x => {
println("max\t"+x._1)
println("min\t"+x._2)
}) //方法二，下面用一个比较鸡贼的方式求最大最小值 val max = data.reduce((a,b) => Math.max(a,b))
val min = data.reduce((a,b) => Math.min(a,b))
println("max : " + max)
println("min : " + min)
sc.stop
}```

`max: 1001 min: 2`

### 平均值问题

```@Test def testAvg(): Unit ={
val sconf = new SparkConf().setAppName("test")
val sc = new SparkContext(sconf) //初始化测试数据
val foo = sc.parallelize(List(Tuple2("a", 1), Tuple2("a", 3), Tuple2("b", 2), Tuple2("b", 8))); //这里需要用到combineByKey这个函数，需要了解的请google
val results=foo.combineByKey( (v)=>(v,1), (acc:(Int,Int),v) =>(acc._1+v,acc._2+1), (acc1:(Int,Int),acc2:(Int,Int))=>(acc1._1+acc2._1,acc1._2+acc2._2)
).map{case(key,value)=>(key,value._1/value._2.toDouble)}
results.collect().foreach(println)
}```

### TopN问题

```@Test
def testTopN(): Unit ={
val sconf = new SparkConf().setAppName("test")
val sc = new SparkContext(sconf) //初始话测试数据 val foo = sc.parallelize(Array(
("a", 1),
("a", 2),
("a", 3),
("b", 3),
("b", 1),
("a", 4),
("b", 4),
("b", 2)
)) //这里测试，取top 2。 val groupsSort=foo.groupByKey().map(tu=>{
val key=tu._1
val values=tu._2
val sortValues=values.toList.sortWith(_>_).take(2)
(key,sortValues)
}) //转换格式进行print val flattenedTopNPerGroup =
groupsSort.flatMap({case (key, numbers) => numbers.map(key -> _)})
flattenedTopNPerGroup.foreach((value: Any) => {
println(value)
})
sc.stop

}```

```(a,4)
(a,3)
(b,4)
(b,3)```

MaxLeap团队_数据分析组 成员：谭杨【原创】