序列化在Spark中的用处 在算子函数中使用到外部变量时,该变量会被序列化后进行网络传输
将自定义的类型作为RDD的泛型类型时(比如JavaRDD,Student是自定义类型),所有自定义类型对象,都会进行序列化。因此这种情况下,也要求自定义的类必须实现Serializable接口。
使用可序列化的持久化策略时(比如MEMORY_ONLY_SER),Spark会将RDD中的每个partition都序列化成一个大的字节数组。
Java序列化与Kryo序列化对比 Java序列化:采用objectOutputStream对象进行序列化,任何的类实现一个 java.io.Serializable接口都可以进行序列化,尽管Java序列化操作十分灵活,但是却十分缓慢,会产生更大的序列化类
Kryo序列化:kryo序列化更快且比Java占用空间更小。但是并不支持所有序列化类,需要事先在应用程序中注册类
kryo启动方式 在conf中配置:
1 "spark.serializer="org.apache.spark.serializer.KryoSerializer"
调优 spark.kryoserializer.buffer 这个是core中kryo缓存的大小,每个core一个,默认64K spark.kryoserializer.buffer.max 这个是缓存的最大大小,默认为64M,序列化的类最大不可超过2G
测试 普通不序列化的MEMORY_ONLY
1 2 3 4 5 6 7 8 val persons=new ArrayBuffer[Person] for(i<-1 to 1000000) { persons+=(Person("name"+i,10+i,"male","haerbin")) } val personrdd=sc.parallelize(persons) personrdd.persist(StorageLevel.MEMORY_ONLY) personrdd.count()
结果:95.3M 使用Java序列化的MEMORY_ONLY_SER
1 2 3 4 5 6 7 8 val persons=new ArrayBuffer[Person] for(i<-1 to 1000000) { persons+=(Person("name"+i,10+i,"male","haerbin")) } val personrdd=sc.parallelize(persons) personrdd.persist(StorageLevel.MEMORY_ONLY_SER) personrdd.count()
结果:39.8M kryo的未注册类的MEMORY_ONLY_SER
1 2 3 4 5 6 7 8 val persons=new ArrayBuffer[Person] for(i<-1 to 1000000) { persons+=(Person("name"+i,10+i,"male","haerbin")) } val personrdd=sc.parallelize(persons) personrdd.persist(StorageLevel.MEMORY_ONLY_SER) personrdd.count()
结果:119.1M ,之所以会发生这种情况,是因为没有注册类,所以kryo就将所有的类进行操作,所有导致占用很大内存 使用kryo注册类,然后打包到集群运行(这种方式命令行看不到效果)
1 2 3 4 5 6 7 8 9 10 11 12 13 sc.getConf.registerKryoClasses(Array(classOf[Person])) val persons=new ArrayBuffer[Person] for(i<-1 to 1000000) { persons+=(Person("name"+i,10+i,"male","haerbin")) } val personrdd=sc.parallelize(persons) personrdd.persist(StorageLevel.MEMORY_ONLY_SER) 使用submit提交jar包 --class sparkcore.SerializerApp \ --name SerializerApp \ --master yarn \ /home/hadoop/lib2/scala6-1.0.jar
结果 :27.5M ,结果显而易见,kryo效果更好
序列化时间 实体类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 import java.io.Serializable; import java.util.Map; public class Simple implements Serializable { private static final long serialVersionUID = -4914434736682797743L; private String name; private int age; private Map<String,Integer> map; public Simple(){ } public Simple(String name,int age,Map<String,Integer> map){ this.name = name; this.age = age; this.map = map; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Map<String, Integer> getMap() { return map; } public void setMap(Map<String, Integer> map) { this.map = map; } }
java原生序列化 OriginalSerializable.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.util.HashMap; import java.util.Map; import bhz.entity.Simple; public class OriginalSerializable { public static void main(String[] args) throws IOException, ClassNotFoundException { long start = System.currentTimeMillis(); setSerializableObject(); System.out.println("java原生序列化时间:" + (System.currentTimeMillis() - start) + " ms" ); start = System.currentTimeMillis(); getSerializableObject(); System.out.println("java原生反序列化时间:" + (System.currentTimeMillis() - start) + " ms"); } public static void setSerializableObject() throws IOException{ FileOutputStream fo = new FileOutputStream("D:/file2.bin"); ObjectOutputStream so = new ObjectOutputStream(fo); for (int i = 0; i < 100000; i++) { Map<String,Integer> map = new HashMap<String, Integer>(2); map.put("zhang0", i); map.put("zhang1", i); so.writeObject(new Simple("zhang"+i,(i+1),map)); } so.flush(); so.close(); } public static void getSerializableObject(){ FileInputStream fi; try { fi = new FileInputStream("D:/file2.bin"); ObjectInputStream si = new ObjectInputStream(fi); Simple simple =null; while((simple=(Simple)si.readObject()) != null){ //System.out.println(simple.getAge() + " " + simple.getName()); } fi.close(); si.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch (IOException e) { //e.printStackTrace(); } catch (ClassNotFoundException e) { e.printStackTrace(); } } }
kyro序列化 KyroSerializable.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.objenesis.strategy.StdInstantiatorStrategy; import bhz.entity.Simple; import com.esotericsoftware.kryo.Kryo; import com.esotericsoftware.kryo.KryoException; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; public class KyroSerializable { public static void main(String[] args) throws IOException { long start = System.currentTimeMillis(); setSerializableObject(); System.out.println("Kryo 序列化时间:" + (System.currentTimeMillis() - start) + " ms" ); start = System.currentTimeMillis(); getSerializableObject(); System.out.println("Kryo 反序列化时间:" + (System.currentTimeMillis() - start) + " ms"); } public static void setSerializableObject() throws FileNotFoundException{ Kryo kryo = new Kryo(); kryo.setReferences(false); kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); kryo.register(Simple.class); Output output = new Output(new FileOutputStream("D:/file1.bin")); for (int i = 0; i < 100000; i++) { Map<String,Integer> map = new HashMap<String, Integer>(2); map.put("zhang0", i); map.put("zhang1", i); kryo.writeObject(output, new Simple("zhang"+i,(i+1),map)); } output.flush(); output.close(); } public static void getSerializableObject(){ Kryo kryo = new Kryo(); kryo.setReferences(false); kryo.setRegistrationRequired(false); kryo.setInstantiatorStrategy(new StdInstantiatorStrategy()); Input input; try { input = new Input(new FileInputStream("D:/file1.bin")); Simple simple =null; while((simple=kryo.readObject(input, Simple.class)) != null){ //System.out.println(simple.getAge() + " " + simple.getName() + " " + simple.getMap().toString()); } input.close(); } catch (FileNotFoundException e) { e.printStackTrace(); } catch(KryoException e){ } } }
**
java原生序列化时间:8281 ms java原生反序列化时间:5899 ms
Kryo 序列化时间:630 ms Kryo 反序列化时间:15 ms
经过对比,可以发现kryo是java原生序列化性能十几倍 官方也推荐尽量使用Kryo的序列化库(版本2)。官文介绍,Kryo序列化机制比Java序列化机制性能提高10倍左右,Spark之所以没有默认使用Kryo作为序列化类库,是因为它不支持所有对象的序列化,同时Kryo需要用户在使用前注册需要序列化的类型,不够方便。