Navigating the Void: Unraveling the Mysteries and Pitfalls of the ‘Void’ Data Type in Apache Spark

Yousry Mohamed
Level Up Coding
Published in
8 min readDec 5, 2023

--

Photo by Pixabay from Pexels: https://www.pexels.com/photo/gray-and-black-galaxy-wallpaper-2150/

While not a very exiting topic, data types are important. Every programming language book usually starts with discussing the language built-in data types to set the stage for further topics. In the old days, there was a joke that if you know what is void main(void) is, then you are a C programmer.

Quoting Wikipedia:

The void type, in several programming languages derived from C and Algol68, is the return type of a function that returns normally, but does not provide a result value to its caller. Usually such functions are called for their side effects, such as performing some task or writing to their output parameters.

Cool, but why this matters in a data pipeline built with Spark. Data is usually pulled from files, databases or message queues and every column will be assigned a type either explicitly or via schema inference which should be used with care (or better not at all).

Let’s discuss a few cases in which a data type of the column will be really a void.

On-demand void generation 😄

The first case is when a data pipeline has some piece of logic implemented using plain SQL that could be generated by an analyst or someone not familiar with Spark and its quircks. That’s really not so uncommon.

>>> df = spark.range(5).withColumnRenamed("id", "value")
>>> df.createOrReplaceTempView("source")
>>> df_final = spark.sql("""
SELECT value, value * value AS squared, NULL as other_column
FROM source
""")
>>> df_final.show()
+-----+-------+------------+
|value|squared|other_column|
+-----+-------+------------+
| 0| 0| NULL|
| 1| 1| NULL|
| 2| 4| NULL|
| 3| 9| NULL|
| 4| 16| NULL|
+-----+-------+------------+

Think about the line with spark.sql as the transformation prepared by some analyst and injected dynamically into the data pipeline. There is source data with a numeric column and the requirement is to add another column which is the squared value plus add another column to hold a NULL for some business reason. As shown above, the outcome is fairly valid and it can be saved or passed to the next step in the pipeline. But before that, let’s have a look at the schema of df_final .

>>> df_final.printSchema()
root
|-- value: long (nullable = false)
|-- squared: long (nullable = false)
|-- other_column: void (nullable = true)

The first two columns are fine but other_column has type of void .

A voidcolumn can also come from a classic PySpark transformation.

>>> import pyspark.sql.functions as F
>>> df_final = (
df
.withColumn("squared", F.col("value") * F.col("value"))
.withColumn("other_column", F.lit(None))
)

>>> df_final.printSchema()

root
|-- value: long (nullable = false)
|-- squared: long (nullable = false)
|-- other_column: void (nullable = true)

Using the None literal will also produce a void column.

Is there any docs on that stuff?

Unfortunately, documentation of OSS Spark data types has no reference of the void type. There is some stuff about NULL literals but it does not tell what data type is expected. Fortunately, Databricks has a small page on the void type and it declares obviously that the void type is equivalent to the untyped NULL value.

With such information, it is somehow obvious that the solution is to always provide an explicit type for the NULL literal to get rid of the void type and live happily ever after. I will discuss shortly some silly implictions of having the void type in the lakehouse. But for now, lets adapt the previous two cases with explicit types.

For the SQL case:

>>> df_final = spark.sql("""
SELECT value, value * value AS squared, CAST(NULL AS string) as other_column
FROM source
""")

>>> df_final.printSchema()
root
|-- value: long (nullable = false)
|-- squared: long (nullable = false)
|-- other_column: string (nullable = true)

For DataFrame APIs case:

>>> df_final = (
df
.withColumn("squared", F.col("value") * F.col("value"))
.withColumn("other_column", F.lit(None).cast("boolean"))
)

>>> df_final.printSchema()
root
|-- value: long (nullable = false)
|-- squared: long (nullable = false)
|-- other_column: boolean (nullable = true)

Another option is to inspect the schema of the final DataFrame before writing/merging and fail the pipeline (or log a warning or whatever) if a the void data type is found.

Let’s ignore the potential cases listed above that may result in a void column appearing in the DataFrame. Is there any other practical cases where the void type will just appear out of nowhere?

CDC tools with limitations

Well, there are cases that can produce a void column in a Spark DataFrame. One concrete example is GCP Datastream which is a serverless CDC service inside GCP. For PostgreSQL source, Datastream does not support pulling geometry or user defined types or arrays of unsupported data types. See this section for more details. Datastream can produce data into Avro format or JSON. If the data is produced in JSON, there will be an additional schema file that can be used to parse the JSON document to make things explicit.

Say there is a table in Postgres where one of the columns has a user defined type.

CREATE TYPE public."complex_type" AS (
x double precision,
y double precision
);

CREATE TABLE my_table(
id integer PRIMARY KEY,
complex_data public."complex_type",
payment_method varchar(100)
);

CDC data sourced from such table will have a schema file similar to the below snippet which is a trimmed version with CDC metadata information removed.

{
"type": "record",
"name": "payload",
"fields": [
{
"name": "id",
"type": [
"null",
"int"
]
},
{
"name": "complex_data",
"type": {
"type": "null",
"logicalType": "unsupported"
}
},
{
"name": "payment_method",
"type": [
"null",
"string"
]
}
]
}

The column called complex_data comes in the schema file with a logical type of unsupported. To convert that CDC provided schema into a Spark schema, we need to use Python avro library.

import json
import avro.schema
from pyspark.sql.types import StructType

# Make sure to install Avro in your Python env

cdc_provided_schema = "<fill the json document in the previous snippet>"
avro_schema_instance = avro.schema.parse(cdc_provided_schema)
schema_converters = spark.sparkContext._jvm.org.apache.spark.sql.avro.SchemaConverters
spark_avro_schema_helper = spark.sparkContext._jvm.org.apache.avro.Schema
java_spark_schema = schema_converters.toSqlType(spark_avro_schema_helper.Parser().parse(str(avro_schema_instance)))
java_schema_json = java_spark_schema.dataType().json()
json_schema = json.loads(java_schema_json)
spark_schema = StructType.fromJson(json_schema)
print(spark_schema)

There might be a warning that unsupported is not a valid type but other than that the output shows clearly that complex_type column is of type NullType which is another way to say it is a void type.

StructType(
[
StructField('id', IntegerType(), True),
StructField('complex_data', NullType(), True),
StructField('payment_method', StringType(), True)
]
)

Let’s see what is the result of parsing sample json data with such schema.

# Prepare a json file with the following two lines:
# {"id":1,"complex_data":null,"payment_method":"card"}
# {"id":2,"complex_data":null,"payment_method":"cash"}

>>> df = spark.read.json("<JSON_DATA_FILE_PATH>", schema=spark_schema)
>>> df.printSchema()
>>> df.show()


root
|-- id: integer (nullable = true)
|-- complex_data: void (nullable = true)
|-- payment_method: string (nullable = true)

+---+------------+--------------+
| id|complex_data|payment_method|
+---+------------+--------------+
| 1| NULL| card|
| 2| NULL| cash|
+---+------------+--------------+

This practical example shows that we may get a void type due to limitations of an ingestion tool or some upstream issue. In the above case, it is not really the issue of null appearing into the data entries themselves, rather it is the schema provided by the CDC tool. It is important to be aware of such nuances because many data pipelines depend on the schema prepared externally and not really controlled from source code for many reasons.

Implications of the void data type

There are implications for having one or more columns treated as a void type. First, let’s ignore joins or doing any meaningful calculations because the data is already NULL. The focus here will be on persistence and downstream consumption.

Going back to the beginning of this post, we will use the dummy DataFrame with a void type.

>>> df = spark.range(5).withColumnRenamed("id", "value")
>>> df.createOrReplaceTempView("source")
>>> df_final = spark.sql("""
SELECT value, value * value AS squared, NULL as other_column
FROM source
""")

The first step is to pretend we are still in the pre delta-lake era and write data to parquet. 👴

>>> df_final.write.mode("overwrite").parquet("/tmp/some-random-location")

AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE]
The Parquet datasource doesn't support the column `other_column` of the type "VOID".

That would not work because parquet does not support the void type.

What about storing it in a delta-lake table?

>>> df_final.write.format("delta").mode("overwrite").save("/tmp/another-random-location")

That should work fine assuming Spark session is already configured for delta lake. But there are some gotchas!!! The created delta lake table completely ignores the void column for batch queries but it includes the column for streaming queries. 🤷‍♂️

>>> spark.read.format("delta").load("/tmp/another-random-location").printSchema()

root
|-- value: long (nullable = true)
|-- squared: long (nullable = true)

>>> spark.readStream.format("delta").load("/tmp/another-random-location").printSchema()

root
|-- value: long (nullable = true)
|-- squared: long (nullable = true)
|-- other_column: void (nullable = true)

Batch query behavior above is slightly strange because delta lake transaction log metadata shows that the void column is there. Just open /tmp/another-random-location/_delta_log/00000000000000000000.json file in a text editor, search for schemaString and then unescape it by replacing \” with " .

{
"type": "struct",
"fields": [
{
"name": "value",
"type": "long",
"nullable": true,
"metadata": {}
},
{
"name": "squared",
"type": "long",
"nullable": true,
"metadata": {}
},
{
"name": "other_column",
"type": "void",
"nullable": true,
"metadata": {}
}
]
}

Delta lake table column medatadata still has the void typed column

Batch query may be working this way because, after all, parquet files don’t have the void column.

>> # Use your own file path
>> some_file = "/tmp/another-random-location/part-00001-3ad8c41e-e27a-4bf5-b0af-decef6011933-c000.snappy.parquet"
>> df = spark.read.parquet(some_file)
>> df.printSchema()
>> df.show()


root
|-- value: long (nullable = true)
|-- squared: long (nullable = true)

+-----+-------+
|value|squared|
+-----+-------+
| 0| 0|

Alright, at least we know batch queries will completely ignore void columns although that may cause confusion or unantcipated behavior in any workloads pulling data from such table.

Let’s come back to the streaming query and see if it works end-to-end, not just printing the schema.

query = spark.readStream.format("delta").load("/tmp/another-random-location")

query.writeStream.outputMode("append").format("console").start()

The above streaming query will fail with an error.

Invalid batch: value#1544L,squared#1545L,other_column#1546 != value#1740L,squared#1741L

The error simply shows that there is a mismatch of the columns defined in delta lake schema and the columns pulled from disk. If the initial DataFrame had the column other_column casted as a specific data type like decimal for example, the streaming query would run successfully.

-------------------------------------------
Batch: 0
-------------------------------------------
+-----+-------+------------+
|value|squared|other_column|
+-----+-------+------------+
| 1| 1| NULL|
| 0| 0| NULL|
| 2| 4| NULL|
| 3| 9| NULL|
| 4| 16| NULL|
+-----+-------+------------+

Understanding the intricacies of the void data type in Apache Spark is crucial for effective data pipeline development. This post covered some scenarios where void types may emerge, such as in SQL transformations and PySpark operations. We have also seen practical cases where void types can unexpectedly appear, such as limitations in Change Data Capture (CDC) tools. It emphasizes the importance of being vigilant about schema nuances, especially when dealing with external schema sources.

The implications of having void columns in a DataFrame are explored, particularly in terms of data persistence and downstream consumption. Challenges arise when attempting to store void columns in formats like Parquet or Delta Lake, leading to potential discrepancies in batch and streaming queries.

It is always useful to be strict (and suspicious 😉) about schema and data types.

Hope that helps!

--

--

Yousry is a lead consultant working for Cuusoo. He is very passionate about all things data including Big Data, Machine Learning and AI.