Spark speculative execution in 10 lines of code

Yousry Mohamed
8 min readSep 1, 2019

--

Resiliency against random software or hardware glitches.

Spark is a general purpose analytics engine targeted mainly at big data scenarios. In most cases, Spark runs in a big data cluster where cluster manager would be responsible for resource allocation and orchestration. Classically this cluster manager is YARN on Hadoop but recently Spark can run on K8s as well. Anyway, let’s me provide an analogy for one of the problems that can happen in a big data environment like this. Say we have a department store that has 10 cashiers. Department manager notices that 9 out of the 10 cashiers can handle a single customer in less than a minute and their queues are pretty short. The last cashier needs more than 5 minutes per customer and he causes his queue to be very upset and delayed. That cashier might be sick and deserves a day off. So the manager will quickly chat with him, send him home to rest and move all customers in his queue to the other cashiers’ queues and things will be stable and healthy again.

The same kind of scenario can happen in big data applications where dozens of cluster nodes are supposed to collectively work on a certain job but one machine has a hardware or software issue thus terribly slowing down the whole process. Spark has a solution to this problem and it’s called speculative execution.

How it works

At the level of a single stage in a Spark job, Spark monitors the time needed to complete tasks in the stage. If some task(s) takes much more time (more on that later) than other ones in same stage, Spark will resubmit a new copy of same task on another worker node. Now we have 2 identical tasks running in parallel and when one of them completes successfully, Spark will kill the other one and pick the output of the successful task and move on.

For the low level details of how that works, there are 4 configuration values controlling this behaviour:

  • spark.speculation : Boolean flag to control whether speculative execution is enabled or not. It’s default value is false (at least for v 2.4.4)
  • spark.speculation.interval: Copying the docs 😂, it’s how often Spark will check for tasks to speculate.
  • spark.speculation.multiplier: It’s a number and should be greater than one. It’s how many times a task duration is larger than the median of the duration of other completed tasks for spark to pick it for re-submission
  • spark.speculation.quantile: Percentage of tasks completed after which Spark will start to apply speculative execution.

Speculative execution doesn’t work in local mode as it’s just a single machine so we need to have a cluster with more than one machine.

I was very keen to try speculative execution in action as it’s a very cool feature and reflects how big data applications are designed for resiliency to random environment issues.

Playground setup

The easiest environment suitable for this experiment is Databricks Spark cluster. I happen to have an Azure subscription so I have used it but think the whole concept is applicable to Databricks on AWS or similar environments.

The first step is to create a cluster with the below specs:

Cluster Specs

The key settings are having more than one worker node and also setting some Spark config values to enable speculative execution. Those values simply say that speculation is enabled and it will kick off when 75% of tasks in a stage are complete. Spark will kick in every 2 seconds and resubmit any task taking more than 5 times the median of successful tasks. Phew, that was mouthful.

By the way, those config values can be provided also as command line arguments to spark-submit or spark-shell.

The other thing to have on this cluster is to register an external library used to do HTTP REST calls (I will mention why later). Databricks makes it extremely easy to search for a library on different repositories like Maven, CRAN, PyPI and then attach it to a certain cluster. You can even upload your own Jar or Python packages directly.

Import external library
Library attached to cluster

10 lines of code to speculate

By now you might be wondering where are those 10 lines of silly code mentioned as the title of the post. As I don’t want to keep you waiting, here you go:

I will explain them in backward fashion.

  • Lines 9 & 10 create a data frame of 10,000 records of a Person class and prints the length of data frame which is a Spark action to trigger the execution.
  • Person class is a simple Scala case class and it simulate some processing on workers using a small Sleep statement on line 7 that sleeps for 2 milli-seconds per each Person.
  • To simulate one of the tasks is facing a severe slowness issue for whatever reason, there is another sleep statement on line 5 that is triggered only for one of the tasks (should be task 0) and the sleep duration is configured by a config value stored in a text file on a static website.
  • I choose this approach as it makes it easy to store the config value in a place easy to edit and also to make the config value available to any node in the cluster.
  • Azure provides static websites where you can dump content like text or HTML files. So if you would like to try the above script yourself, you need to host that config.txt file somewhere and update the URL accordingly.

Demo time

First in your config file, make sure the text file has a small value like 1 or 2 or even a zero (I picked a 1 in my case). This means there will be no major difference of the time needed to run task 0 compared to all other tasks.

Second, we need to create a new notebook and link it with the current cluster, make sure the cluster is running.

In the new notebook, paste the first section of the code as below and run it. It will just define the Person class.

Then create a new cell and paste the last two lines to create and count number of rows in the data frame. The cell runs successfully and prints 10,000 as number of rows.

Databricks is very neat on accessibility as you can click inline within the executed cell to reach the logs and Spark history details.

From the above, it appears we had 8 tasks and task 0 (which has Person with Id 5) took 6 seconds instead of 3 seconds for all other tasks. You might wonder why 6 and not 4 as we have a 1 second delay, it seems this was a library bootstrapping time or something as when I tried again it really took 4 seconds.

Side Note: As our the cluster has 2 workers, you will see two host IPs in the host column.

Anyway, the above is our happy scenario. Let’s repeat the whole thing once again but follow this updated flow:

  • Start with a delay config of 100, this would cause severe delay for task 0
  • Run both cells but don’t wait for completion
  • Wait 5 seconds and update the delay config to a value of 1 once again
  • Wait for second cell to complete and inspect the logs

This time we have two tasks with index 0 one marked killed and the other succeeded. What happened here is that initially task zero had an injected delay of 100 seconds. Spark speculative execution is enabled and it has kicked of after 75% of tasks in the current stage completed. Think 7 out of 8 tasks is more than 75% 😃. Median of all completed tasks is 3 seconds and our multiplier is 5 so tasks more than 15 (or it could be 18) seconds are to be resubmitted, please note that Spark does this job every two seconds. Anyway, Spark resubmitted task 0 and by that time I have already tweaked the delay back to one second. The resubmitted task (marked speculative) completed before the old delayed task and hence Spark killed the old one and picked the output of the new resubmitted one and life goes on. One other thing to notice, the resubmitted job was sent to be executed on a different worker node. This way Spark tries as much as possible to eliminate the possibility of the task being slow due to a problem related to a specific node on the cluster.

We can also inspect stdout logs of both tasks to verify the amount of delay injected to verify our assumptions.

Hopefully that explains how Spark speculative execution works, how to tweaks its parameters and how to inspect it from logs and history server.

Caveats

Speculative execution is not a silver bullet to solve any performance issues in your applications. For example, if there is data skewness and one of the tasks is loaded with too much input data while the other are not then there will be no value in using speculative execution. It may actually make things worse.

--

--

Yousry Mohamed
Yousry Mohamed

Written by Yousry Mohamed

Yousry is a lead consultant working for Cuusoo. He is very passionate about all things data including Big Data, Machine Learning and AI.