Spark 单元测试

Spark 单元测试

相比于传统代码,Spark是比较难调试的。程序运行在集群中,每次修改代码后,都要上传到集群进行测试,代价非常大,所以优先在本地进行单元测试,可以减少小模块的逻辑错误。

ScalaTest 测试框架

ScalaTest是比JUnit和TestNG更加高阶的测试编写工具,这个Scala应用在JVM上运行,可以测试Scala以及Java代码。ScalaTest一共提供了七种测试风格,分别为:FunSuite,FlatSpec,FunSpec,WordSpec,FreeSpec,PropSpec和FeatureSpec。


FunSuite的方式较为灵活,而且更符合传统测试方法的风格,区别仅在于test()方法可以接受一个闭包。而FlatSpec和FunSpec则通过提供诸如it、should、describe等方法,来规定书写测试的一种模式。


Maven中引入:

1
2
3
4
5
6
7
<!-- Test Dependency -->
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
<version>3.1.1</version>
<scope>test</scope>
</dependency>


首先定义好我们要测试的函数,下面的程序中,我们需要对 count(rdd:RDD[String]): RDD[(String,Int)] 进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package unit_test

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object WordCount extends Serializable{

def main(args: Array[String]): Unit = {
val sparkConf: SparkConf = new SparkConf().setAppName("test_rdd").setMaster("local")

val sc = new SparkContext(sparkConf)

val make_rdd: RDD[String] = sc.parallelize(Array("Brent","HayLee","Henry"))

val result_rdd: RDD[(String, Int)] = count(make_rdd)

result_rdd.foreach(println)
}

def count(rdd:RDD[String]): RDD[(String,Int)]={
val wordcount_rdd: RDD[(String, Int)] =rdd.map((word: String) =>(word,1))
.reduceByKey((_: Int) + (_: Int))
wordcount_rdd
}

}


创建测试类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import org.apache.spark.{SparkConf, SparkContext}
import org.scalatest.{BeforeAndAfter, FlatSpec}
import unit_test.WordCount
//引入scalatest建立一个单元测试类,混入特质BeforeAndAfter,在before和after中分别初始化sc和停止sc,
//初始化SparkContext时只需将Master设置为local(local[N],N表示线程)即可,无需本地配置或搭建集群,

class WordCountTests extends FlatSpec with BeforeAndAfter{
val master="local" //sparkcontext的运行master
var sc:SparkContext=_
"wordcount_class" should "map word ,1" in{
//其中参数为rdd或者dataframe可以通过通过简单的手动构造即可
val seq=Seq("Brent","HayLee","Henry")
val rdd=sc.parallelize(seq)
val wordCounts=WordCount.count(rdd)
wordCounts.map(p=>{
p._1 match {
case "Brent"=>
assert(p._2==1)// 断言
case "HayLee"=>
assert(p._2==1)
case "Henry"=>
assert(p._2==1)
case _=>
None
}
}).foreach(_=>())
}
//这里before和after中分别进行sparkcontext的初始化和结束,如果是SQLContext也可以在这里面初始化
before{
val conf=new SparkConf()
.setAppName("test").setMaster(master)
sc=new SparkContext(conf)
}

after{
if(sc!=null){
sc.stop()
}
}
}