序列化在Spark中节约空间了么?

序列化在Spark中的用处

  • 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输

  • 将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。

  • 使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。

    Java序列化与Kryo序列化对比

  • Java序列化:采用objectOutputStream对象进行序列化,任何的类实现一个 java.io.Serializable接口都可以进行序列化,尽管Java序列化操作十分灵活,但是却十分缓慢,会产生更大的序列化类

  • Kryo序列化:kryo序列化更快且比Java占用空间更小。但是并不支持所有序列化类,需要事先在应用程序中注册类

    kryo启动方式

    在conf中配置:

    1
    "spark.serializer="org.apache.spark.serializer.KryoSerializer"

调优

  • spark.kryoserializer.buffer 这个是core中kryo缓存的大小,每个core一个,默认64K
  • spark.kryoserializer.buffer.max 这个是缓存的最大大小,默认为64M,序列化的类最大不可超过2G

测试

普通不序列化的MEMORY_ONLY

1
2
3
4
5
6
7
8
val persons=new ArrayBuffer[Person]
for(i<-1 to 1000000)
{
persons+=(Person("name"+i,10+i,"male","haerbin"))
}
val personrdd=sc.parallelize(persons)
personrdd.persist(StorageLevel.MEMORY_ONLY)
personrdd.count()

结果:95.3M


使用Java序列化的MEMORY_ONLY_SER

1
2
3
4
5
6
7
8
val persons=new ArrayBuffer[Person]
for(i<-1 to 1000000)
{
persons+=(Person("name"+i,10+i,"male","haerbin"))
}
val personrdd=sc.parallelize(persons)
personrdd.persist(StorageLevel.MEMORY_ONLY_SER)
personrdd.count()

结果:39.8M


kryo的未注册类的MEMORY_ONLY_SER

1
2
3
4
5
6
7
8
val persons=new ArrayBuffer[Person]
for(i<-1 to 1000000)
{
persons+=(Person("name"+i,10+i,"male","haerbin"))
}
val personrdd=sc.parallelize(persons)
personrdd.persist(StorageLevel.MEMORY_ONLY_SER)
personrdd.count()

结果:119.1M,之所以会发生这种情况,是因为没有注册类,所以kryo就将所有的类进行操作,所有导致占用很大内存


使用kryo注册类,然后打包到集群运行(这种方式命令行看不到效果)

1
2
3
4
5
6
7
8
9
10
11
12
13
sc.getConf.registerKryoClasses(Array(classOf[Person]))
val persons=new ArrayBuffer[Person]
for(i<-1 to 1000000)
{
persons+=(Person("name"+i,10+i,"male","haerbin"))
}
val personrdd=sc.parallelize(persons)
personrdd.persist(StorageLevel.MEMORY_ONLY_SER)
使用submit提交jar包
--class sparkcore.SerializerApp \
--name SerializerApp \
--master yarn \
/home/hadoop/lib2/scala6-1.0.jar

结果 :27.5M,结果显而易见,kryo效果更好

序列化时间

实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import java.io.Serializable;
import java.util.Map;

public class Simple implements Serializable
{
private static final long serialVersionUID = -4914434736682797743L;
private String name;
private int age;
private Map<String,Integer> map;
public Simple(){

}
public Simple(String name,int age,Map<String,Integer> map){
this.name = name;
this.age = age;
this.map = map;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public int getAge() {
return age;
}

public void setAge(int age) {
this.age = age;
}

public Map<String, Integer> getMap() {
return map;
}

public void setMap(Map<String, Integer> map) {
this.map = map;
}


}

java原生序列化 OriginalSerializable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.HashMap;
import java.util.Map;

import bhz.entity.Simple;

public class OriginalSerializable {

public static void main(String[] args) throws IOException, ClassNotFoundException {
long start = System.currentTimeMillis();
setSerializableObject();
System.out.println("java原生序列化时间:" + (System.currentTimeMillis() - start) + " ms" );  
start = System.currentTimeMillis();
getSerializableObject();
System.out.println("java原生反序列化时间:" + (System.currentTimeMillis() - start) + " ms");
}

public static void setSerializableObject() throws IOException{

FileOutputStream fo = new FileOutputStream("D:/file2.bin");

ObjectOutputStream so = new ObjectOutputStream(fo);

for (int i = 0; i < 100000; i++) {
Map<String,Integer> map = new HashMap<String, Integer>(2);
map.put("zhang0", i);
map.put("zhang1", i);
so.writeObject(new Simple("zhang"+i,(i+1),map));
}
so.flush();
so.close();
}

public static void getSerializableObject(){
FileInputStream fi;
try {
fi = new FileInputStream("D:/file2.bin");
ObjectInputStream si = new ObjectInputStream(fi);

Simple simple =null;
while((simple=(Simple)si.readObject()) != null){
//System.out.println(simple.getAge() + " " + simple.getName());
}
fi.close();
si.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
//e.printStackTrace();
} catch (ClassNotFoundException e) {
e.printStackTrace();
}

}

}

kyro序列化 KyroSerializable.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.objenesis.strategy.StdInstantiatorStrategy;

import bhz.entity.Simple;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.KryoException;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;

public class KyroSerializable {

public static void main(String[] args) throws IOException {
long start = System.currentTimeMillis();
setSerializableObject();
System.out.println("Kryo 序列化时间:" + (System.currentTimeMillis() - start) + " ms" );
start = System.currentTimeMillis();
getSerializableObject();
System.out.println("Kryo 反序列化时间:" + (System.currentTimeMillis() - start) + " ms");

}

public static void setSerializableObject() throws FileNotFoundException{

Kryo kryo = new Kryo();
kryo.setReferences(false);
kryo.setRegistrationRequired(false);
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.register(Simple.class);
Output output = new Output(new FileOutputStream("D:/file1.bin"));
for (int i = 0; i < 100000; i++) {
Map<String,Integer> map = new HashMap<String, Integer>(2);
map.put("zhang0", i);
map.put("zhang1", i);
kryo.writeObject(output, new Simple("zhang"+i,(i+1),map));
}
output.flush();
output.close();
}


public static void getSerializableObject(){
Kryo kryo = new Kryo();
kryo.setReferences(false);
kryo.setRegistrationRequired(false);
kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
Input input;
try {
input = new Input(new FileInputStream("D:/file1.bin"));
Simple simple =null;
while((simple=kryo.readObject(input, Simple.class)) != null){
//System.out.println(simple.getAge() + " " + simple.getName() + " " + simple.getMap().toString());
}

input.close();
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch(KryoException e){

}
}

}

**

java原生序列化时间:8281 ms
java原生反序列化时间:5899 ms

Kryo 序列化时间:630 ms
Kryo 反序列化时间:15 ms

经过对比,可以发现kryo是java原生序列化性能十几倍


官方也推荐尽量使用Kryo的序列化库(版本2)。官文介绍,Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。