Spark Session and the singleton misconception

What structured streaming reveals about Spark Session

Yousry Mohamed
Analytics Vidhya

--

Photo by Pixabay: https://www.pexels.com/photo/yellow-and-black-butterflies-cocoon-39862/

Why to even care?

In the old days of Spark, the entry point to Spark application was SparkContext and as per Spark source code comments it “represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster”. Since Spark 2.0, a new entry point has been introduced called SparkSession . Spark session is the preferred way for accessing most of spark functionality specially that the focus is more on high level API’s like SQL API and less on low level RDD API.

At first glance, Spark session sounds like a singleton and there seems to be a single instance per the whole application regardless of how you grab a pointer to that session. Sometimes you just use spark variable and sometimes you get it from a DataFrame variable but in all cases it is the same.

Image by author

As you can see from the above screenshot, the variable address and sessionUUID property are identical whether you use the famous spark variable or you grab it from a DataFrame.

Do you think this pattern is universal or in other words is Spark session a singleton?

For too long, I thought yes it is a singleton and for most cases you don’t even care about its nature. But I hit a case recently that proved I am wrong and that is Spark structured streaming.

https://imgflip.com/i/6a6mit

Let’s try this snippet which uses foreachBatch to run batch operations on a streaming micro-batch.

Here is the result.

Image by author

Session memory address and UUID property are different between the global spark variable and the Spark session connected to the micro batch DataFrame. But why this happens and how it may impact your code?

The why first!

After a bit of digging in the source code (of Spark version I use which is 3.2.1) I found this line in a file named StreamExecution.scala.

/** Isolated spark session to run the batches with. */
private val sparkSessionForStream = sparkSession.cloneSession()

Tip: If you use Databricks then expect that their Spark implementation is not 100% identical to the open source Spark repo on GitHub.

I checked SparkSession class first to find how it can be constructed and then spotted a cloneSession method which seems to be called from a single location and that is streaming execution file. Cloning here means there will be an isolated session for the Spark SQL API code running the streaming code. That session will inherit the state from the root Spark session but it will work in isolation because it needs to enforce some behaviour that could be different than the configuration of root session.

For example, it seems that Adaptive Query Execution has to be disabled for streaming.

// Adaptive execution can change num shuffle partitions, disallow        sparkSessionForStream.conf.set(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key, "false")        // Disable cost-based join optimization as we do not want stateful operations        
// to be rearranged sparkSessionForStream.conf.set(SQLConf.CBO_ENABLED.key, "false")

Cool, now we understand that there can be multiple sessions per application and each one of them can have different configuration. This is used for the structured streaming API but it can be also used in user application code as well as there are two public API’s (newSession and cloneSession) that allows you to create a new session. Multiple sessions can live happily in the same Spark application which maps to a single SparkContext.

The two sessions shown above have different memory address but they are tied to the same SparkContext. They can be still used to create DataFrames and generally speaking will behave similar to the root spark session although they can have different configuration settings if needed.

But why do I need to care ?

https://imgflip.com/i/6a6oc3

Well, sometimes you may need to care. First it’s nice to know something new, have a new tool in your toolbox or know how things work under the hood. If that’s not really enough, let’s see this case which is written using PySpark for a reason I will mention shortly.

The above will fail with error Table or view not found: microBatch. In the above snippet, say we need to use SQL statement to run some logic for example a MERGE statement. Yes, it can be done using functional API but sometimes writing SQL code is more readable.

In PySpark, there is no sparkSession attribute on a DataFrame instance at least till version 3.2.1 (Scala has it by the way). So the other option is to use df.sql_ctx property which can point to a Spark session but unfortunately that seems to be pointing to the root Spark session. That’s why we get that error message because microBatch temporary view is registered on the session of streaming micro batch while the SQL statement on line 3 runs against the root Spark session so the view is not visible there.

If you google the words “spark streaming foreachBatch createOrReplaceTempView” : you will probably get a result from Databricks website with some notebook that has code like this.

# NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe

microBatchOutputDF._jdf.sparkSession().sql("""
MERGE INTO aggregates t
USING updates s
ON s.key = t.key
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

Well, that solves the problem by using microBatchOutputDF._jdf.sparkSession() approach. The comment talks about using SparkSession used to define the updates DataFrame but it is pretty vague. Plus it is not very idiomatic/recommended in Python to use methods starting with underscore because they are considered private.

Actually if you check PySpark documentation of DataFrame, you will not find a public property called _jdf nor you will find asql_ctx. I still prefer sql_ctx approach because it behaves in a Spark natural manner. If you collect some records from SQL statement you will get an array of Row elements.

Image by author

Doing the same using _jdf will produce Java objects which is quite weird or at least not so natural if you use type annotations and develop using an IDE like VS Code.

Image by author

Short term solution

Back to our streaming issue, what would be the solution to run SQL statements against the right Spark session?

The easiest solution is to use global temporary views. The view can be accessed from any Spark session as long as its name is prefixed with global_temp. in the SQL statement.

The major difference here is on lines 2 and 3 . The view is created as a global temp view on line 2. On line 3, the view is accessed using the root Spark session but it has to be prefixed with global_temp.. Once that’s done, things work fine without any errors.

Image by author

Using sql_ctx is not really optimal but it is way better than _jdf approach.

Long term solution

The community behind Spark is aware of these limitations in PySpark interface of DataFrame and how it makes developers life hard while it is not really matching the Scala equivalent API which has an easy interface to Spark session. That’s why there is a long-term solution but it will probably come with next (hopefully minor) Spark version.

There is a JIRA issue named Use SparkSession instead of SQLContext inside PySpark. This issue has a recently merged PR with the following description:

This PR proposes to SparkSession within PySpark. This is a base work for respecting runtime configurations, etc. Currently, we rely on old deprecated SQLContext internally that doesn't respect Spark session's runtime configurations correctly.

This PR also contains related changes (and a bit of refactoring in the code this PR touches) as below:

- Expose DataFrame.sparkSession like Scala API does.

- Move SQLContext._conf -> SparkSession._jconf.

- Rename rdd_array to df_array at DataFrame.randomSplit.

- Issue warnings to discourage to use DataFrame.sql_ctx and DataFrame(..., sql_ctx).

Once this PR is available on whatever Spark installation you use, it would be easy to update the code and remove the dependency on sql_ctx plus we don’t have to use global temp views anymore. The below will work smoothly (fingers-crossed).

df.createOrReplaceTempView("microBatch")  
count = df.sparkSession.sql("select * from microBatch").count()

Update 16/06/2022:

Spark 3.3 has been released on Databricks runtime 11.0 beta. So a PySpark DataFrame has a SparkSession attribute.

That’s all folks

It might not be a very common use case or something that makes a huge difference in how your write Spark applications. Nonetheless, knowing a bit more about Spark session and how it behaves in different situation is quite useful. It may help in diagnosing a silly problem or even give you an idea to write an innovative solution to a hard task.

Hope that helps! 🙂

--

--

Yousry Mohamed
Analytics Vidhya

Yousry is a lead consultant working for Cuusoo. He is very passionate about all things data including Big Data, Machine Learning and AI.