Yet another Spark serialization article
Because seeing is believing
Well, the topic of serialization in Spark has been discussed hundred of times and the general advice is to always use Kryo instead of the default Java serializer. It’s claimed to be faster and saves disk space as well. But why should we worry much about how Spark serializes its internal data frames or RDDs.
There are two main reasons:
- Joins and grouping operations will be hugely impacted by serialization as they will usually involve data shuffling among cluster nodes. The faster and smaller the amount of data to be shuffled, the quicker the join/grouping operation would be.
- Caching also depends on serialization specially when caching to disk or when data spills over from memory to disk and also in the case of MEMORY_ONLY_SER storage level which caches in memory but serializes the data
In big data scenarios, a slight improvement might result in huge performance gain due to the massive amount of data being processed, think something like chain reactions.
That sounds like good marketing talk but yet to be verified practically. It’s a bit hard to benchmark such things in a quick blog post using a practical application specially using join or grouping use cases.
My target is to compare both serialization options in a small Spark sandbox application using disk caching to get a feeling if there is really a benefit of using Kryo because seeing is believing.
Setup
Nothing too complicated is required. A local Spark 2.x environment is all that is needed. For sure I have also the tooling to write and compile Scala apps. IntelliJ community edition is good enough for that. And by the way, I am running on Windows using Spark 2.3.2
The code
It’s a canonical Scala console application with SBT built tool. The first Scala file we have is a data generator class to generate some test data. One thing to highlight here, Kryo shines when there is a complex object graph to serialize. I started initially with simple classes (flat case class with a bunch of simple fields) but in such case there was no major difference between Kryo and Java. Also I will use RDDs and not data frames but I guess same outcome applies to data frame if we have similar schema.
Anyway, it’s pretty obvious from the below snippet that we can call a singleton object to generate a 100,1000 element array of dummy invoice data.
The second and final class is the application class. It’s main job is to call the above generator and wrap the generated array into an RDD and then cache it to disk and then apply a couple of actions on this RDD. The actions are not very relevant but the first one of them is needed to trigger the aching behaviour. Also this class parses command line arguments and checks if a certain parameter is sent and if so it switches from the default (Java) serializer to Kryo.
The application class job is to:
- Parse command line arguments and use Kryo if application is called with a “Kryo” keyword in command line arguments
- Part of Kryo registration is to register all classes included in the RDD to be serialized. It’s also crucial to include the next line as it forces Spark to use Kryo against all downstream classes in object graph to be serialized
conf.set("spark.kryo.registrationRequired", "true")
- Create an RDD out of the dummy data from the generator and cache (persist) it to disk. We could also used StorageLevel.MEMORY_ONLY_SER but DISK_ONLY is good enough to test stuff from Spark UI and locally on the file system as well.
Running the test
Let’s give the above code a go and see what we shall get. First we will try the default Java serializer. Assuming the application is compiled correctly in InetlliJ and packaged using SBT, we can run the following in a CMD/PowerShell window:
spark-submit --class Serializer --master local[*] serializer_2.11-0.1.jar
If everything is OK, the shell window will show something like:
Now let’s see how much disk space is used to cache this dataset. Spark UI is hosted on http://localhost:4040/ in the local case. Opening this URL and heading to Storage and we can see more details about cached data in Spark cluster.
Because the data is cached to disk and in application class we specified a custom temp directory, we can also view the local files used to cache this RDD. I have 8 logical cores on my laptop so I got 8 files as I triggered Spark application to make use of all cores available.
Ok, Java scores a 20MB for this test app so let’s see if Kryo scores better.
To force Kryo we just need to call same application but pass “Kryo” at the end of the command.
spark-submit --class Serializer --master local[*] serializer_2.11-0.1.jar kryo
Spark UI shows much less disk space used to store the persisted RDD.
Also it’s easy to verify if Kryo is the current serializer from environment tab.
Kryo scored 13.8MB while Java scored 20MB and this is roughly 31% improvement against this small dataset with not too deep object graph. This comparison is for the disk/memory space used and it does not include CPU/compute factor but I am happy with current finding. In a nutshell, if I have a complex object graph with heaps of data and likelihood of shuffling then Kryo is a must.
Notes
- Kryo is not set as the default serializer because it needs an explicit registration step.
- Kryo can without the request to register all classes but probably there will be no benefit as class names have to be persisted with data.
- For Data Frames, Spark uses a different encoder neither Kryo nor Java. So the above discussion doesn’t apply to data frames unless they are converted to pure RDDs. Testing the same application but tweaking it to use a data frame instead of an RDD yields a 30MB serialized data frame size for both Java & Kryo so effectively there is no difference between them for data frames.
The difference can be spotted easily from the screenshot as well. RDDs show as ParallelCollectionRDD
while data frames show as the logical execution plan which is LocalTableScan
in our case. Cached partitioned count is different as I did the data frame example on a different laptop but trust me this is not the root cause behind the 30MB. 😂
Source code
In case you would like to pull this super complex app and give it a go 😉