Delta Lake Z-Ordering from A to Z

Yousry Mohamed
15 min readSep 19, 2022

Understand how to optimise delta lake tables for high cardinality queries

https://unsplash.com/photos/3mY4U7FIevc

The needle in the haystack

Many performance tuning techniques of big data workloads revolve around finding a way to process smaller amounts of data. For example, in Spark you may see predicate push down, disk partition pruning, using columnar files to pull only the columns required for an aggregation and generally applying filters as early as possible in a complicated query.

Delta Lake format has become very popular to build lakehouse architectures mixing the benefits of both data lakes and data warehouses. Because delta allows ACID transactions (DML operations on a single table), those tables are not considered immutable anymore and can handle use cases like backfilling and updating records in-place or deleting an individual record for compliance reasons like GDPR requirements. This is something that was not easily doable with plain parquet tables without rebuilding the whole table. With that flexibility and power comes the requirement to optimise not only low cardinality queries like aggregations but also high cardinality queries like updating or selecting a few records out of a massive table. Without proper tuning, such high cardinality queries will need to read (aka scan) the whole table to find the needle in the haystack.

Databricks has added Z-Ordering data arrangement to handle this problem and it has been available in its own proprietary delta offering for quite some time. That technique is a known data storage method to achieve data locality which means it will help with data skipping. Delta has been completely open sourced after Data and AI summit 2022. Z-Ordering has been available to the OSS version of delta lake and the source code is also available to understand how it works. Quoting Delta docs:

Z-Ordering is a technique to colocate related information in the same set of files. This co-locality is automatically used by Delta Lake in data-skipping algorithms. This behavior dramatically reduces the amount of data that Delta Lake on Apache Spark needs to read.

It is very useful to understand how such methods work behind the scenes for a couple reasons:

  • If you know how it is built and to solve what problems, you will be able to make the most out of it and use it in the right situations.
  • When you dissect such features, you will probably hit some new APIs and ideas that can help you later solve other unrelated difficult problems.

In this post, we will begin from the bare basics and build a step by step understanding of delta lake Z-Ordering.

Environment Setup

Nothing too fancy is required here and we will stick with the free Databricks community edition offering. So this is our setup for reproducibility:

  1. Databricks community edition
  2. New cluster with runtime 11.1 (That would give us 2 cores and 16GB of memory; not really big but good enough)
  3. A fresh Python notebook
  4. If running in a paid Databricks workspace, disable delta cache because it will intervene with the analysis here. It is not that it is bad but will make observations hard to analyse. To disable it, run: spark.conf.set("spark.databricks.io.cache.enabled", "false")

Poor man Z-Ordering

The first step in the process is to build some fictional test data. There are sophisticated methods to do that like Databricks labs Data Generator but it is much simpler to use SQL functions like RANGE to building something useful for our exercise.

The above creates a 100M records table with a random numeric Id in the rannge of 50M and a random person name and heart rate.

Image by author

Next, save the fictional data into a delta table.

Tables saved to default database will normally live under dbfs:/user/hive/warehouse so we can inspect the underlying parquet files and delta transaction log.

Image by author

There are 8 parquet files and each one of them is around 160MB. Let’s run a SELECT query with a high cardinality filter.

image by author

That query took 27 seconds to run but the important aspect is that Spark had to scan all the files to find that individual record.

P.S. If you do this tutorial yourself, the query above may return a different number of records due to random data generation. I will not talk about seeds here.

The metrics popup to the right of the screenshot above beside the query is reachable from the links under Spark jobs section. It is the same metrics you get if you navigate to Spark UI SQL tab and find the last SQL query there. The above is just a UX nice shortcut.

Why Spark had to scan all files?

Delta has metadata stored in the transaction log for each parquet file added in a certain transaction. We can inspect this metadata and find some min/max values for all columns including Id. If you open one of those JSON files in a text editor or load it in a Spark Dataframe, you can see details per each added parquet file like this.

Image by author

To make the stats for all files easy to visualise, define the below function to collect those metrics. The function is just plumbing code around the JSON data contained in transaction log files.

Now run the function against the first transaction which created the source table we have. You can use %fs magic or dbutils.fs.ls to list all files in transaction log folder.

Image by author

Every parquet file has 12.5M records as expected but each one of them has a range of Ids spanning the whole spectrum of 0 till 50M. So Spark from the files statistics knows that record(s) with Id 1000 can be in any one of the files hence all of them have to be scanned.

I hear you now saying “Let’s repartition on Id column, that should produce some data locality!” Unfortunately, that is not the case and here is the result of another table partitioned on Id.

Image by author

Partitioning simply means same Ids will land in the same file (that’s good for compression specially for strings or complex types like arrays) but because partitioning is done using hashing and that means assigning the hash remainder to a specific partition, it will not group consecutive ranges of Ids in the same file. Yes, all records with Id=1000 will live in a single file but the range of Ids in each file will still cover the whole spectrum as shown above.

Let’s try a different method to re-order data points. Spark DataFrames has a method called repartitionByRange that takes number of partitions plus one or more columns. The function will group data in different partitions based on a disjoint range of those columns. The bounds are calculated using reservoir sampling method behind the scenes for performance reasons. See RangePartitioner object in Spark repo for more details.

Image by author

Each parquet file now has data for a disjoint range of Ids. You can see that min_id of a certain file is equal to max_id of the previous file plus 1.

Now run the same SQL query against the new table.

Image by author

Spark now can pull the statistics for each file in the delta table and know for sure the range of values there for certain columns like the Id column. So There is no need to scan all files to pull record with Id = 1000. It is definitely found in the first file named part-00000-.... . So instead of reading 8 files, 100M records and around 1.3GB of data; we could run the query with 1 file, ~12M record and around 160MB only. The query took 5 seconds instead of 27 seconds. If many downstream queries involve a filter on Id column, then this data storage re-ordering would result in obvious performance gain and cost saving. Writing data after range partitioning it is a simple way to help achieve data skipping. Please remember that range partitioning involves a shuffle so it would take some time to re-order data specially if the volume is big.

So if it is just simple to implement the whole thing with a single repartitionByRange why there was a need to implement delta Z-Ordering?

Going from a single dimension to multi-dimension

https://unsplash.com/photos/902vnYeoWS4

Range partitioning in the preview section is good for cases where most of your queries involve a single high cardinality column filter although there is also a bit of risk around skew. But the major problem comes when we have more than one column for data co-location. Let’s have a look on the below sample data which is just a cartesian product of all integer values from 0 to 3.

Range partitioning by x and y will produce partitions that are simply bound to x only (at least for this sample data).

Image by author

Linear data placement

By now you should be aware that the problem to be solved is around data placement on disk and which file should be chosen to store a certain record of data. Let’s start with a hypothetical use case.

  • 64 records of data across two dimensions x and y
  • The records are cartesian product of x and y where both of them are integers ranging from 0 to 7
  • It is required to store the data in 16 files (4 record per file)
  • The queries we want to optimise can involve both x and y

Add the following snippet to the current notebook. It’s plain Python not really Spark/Delta related in any way so you can also run it in your preferred Python environment.

The above is data preparation. DataItem is our record to be stored and FileInfo is similar to the metadata we find in delta transaction log where it holds column-level statistics. The first 4 records should look like this.

Image by author

Next we want to find out what file out of the 16 allowed files should each record go into. Part of that as well is to calculate the statistics of each file. Statistics here means min and max values of x and y. In this stage, linear data placement will be used. Linear here means we would process one dimension at a time in a linear fashion. For example, starting with all x=0 there are 8 records for y ranging for 0 to 7. So the records having x=0 and y = 0 to 3, will go into the first file. Records having x=0 and y = 4 to 7 will go into the second file. Then we move on to x=1 and repeat the process. The next snippet loops over all records and assigns each record to be stored in a certain file. It also populates column-level statistics for each file.

Here is the output of the above showing the records linked with each file plus the statistics.

Image by author

Now let’s say we want to assess how the collected column statistics will help with a query like SELECT * FROM <TABLE> WHERE x=2 OR y=3.

The following snippet loops over all file metadata records prepared before (16 FileInfoobjects) to check if the file can be skipped or not. Any file that cannot be skipped will have to be read from disk. Also there is a small piece of logic to count how many false positives are there in the files that are read. In general, this is just a very basic method of checking the effectiveness of the data placement approach we follow.

The above will produce the following result which shows 9 files out of 16 have to be scanned because they may contain the required records based on the statistics stored for each file. Also inside those 9 files, there will be 21 false positives.

Image by author

Z-Order data placement

Now let’s try a different data placement method. Surprise! we will try Z-Order. It works by applying a bit-interleaving assignment of the column values we would like to co-locate data based off. This is just over-simplification and we will discuss shortly how it is being implemented in delta lake but here is visual representation. Each binary value shown diagonally at the crossing of an x,y value is the Z-Order value of those two points.

Each data point has a Z-Order as shown above hence if you connect a line starting Z-Order 0 onwards you will get that zigzag shape. You may recall for linear placement the first file contained the values (0,0),(0,1),(0,2),(0,3). In contrast, For Z-Order placement, the first file will contain (0,0), (0,1),(1,0),(1,1).

Here is the equivalent approach of finding file placement for each record using the Z-Order approach.

And here is the file placement and column-statistics.

Image by author

Now, let’s assess the search performance of this approach using the same query SELECT * FROM <TABLE> WHERE x=2 OR y=3.

Image by author

That’s a bit better than linear placement but bear in mind that in big data scenarios a single file usually contains millions of records. So if all files have decent data locality across all the columns required in common queries, then there will be an obvious benefit of limiting the number of files to scan.

Here is a visual representation of the linear vs Z-Order methods for the sample case above.

https://docs.google.com/document/d/1TYFxAUvhtYqQ6IHAZXjliVuitA5D1u793PMnzsH_3vs/edit#

The above screenshot is from the design document published by delta team and linked from the relevant PR.

Z-Order in the delta land

How it is implemented

Delta Lake has a command called OPTIMIZE which is used compact small files into bigger ones. Z-Order has to be done as part of the OPTIMIZE command. This operation can be done both on flat tables and on partitioned tables. For simplicity, we will focus on tables without partitions although the same concept applies to partitioned tables as well but the obvious takeaway is that you cannot Z-Order on a partition column. Partition columns are represented as disk folders and they are usually low cardinality columns like dates or department names.

We have seen how Z-Order can be used to achieve data locality for a couple of dimensions of integer values. To have the same in Delta Lake, the method should be generic and should support non-integer columns like strings or even timestamps without any special handling from user.

The process of running OPTIMIZE ZORDER BY (columns) on a table without partitions can be roughly explained as follows:

  • All non tombstone files of the delta table are considered and will be rewritten after compaction and Z-Ordering. Z-Ordering forces touching and rewriting all files which is not the case if optimisation is done to just compact small files.
  • Number of output files is controlled by config value spark.databricks.delta.optimize.maxFileSize . If we have 10GB of data and that config value is 1GB, then 10 files will be written.
  • Next data is range-partitioned into the calculated number of files according to a Z-Order calculation (to be explained shortly) hence producing data locality across multiple dimensions which are the columns used in the ZORDER BY clause.
  • The repartitioned DataFrame is written to disk which involves also writing column-level statistics in delta log.

Let’s talk a bit about how to calculate Z-Order for multiple columns in delta lake table. This requires a bit of help from Spark Session Extensions. It’s an extension API in Spark that allows a developer to plugin their own query execution rules. Many ISVs like Immuta use this feature to build some extra added value on top of Spark.

The process of calculating Z-Order goes as follows:

  • Delta lake table data is read into a DataFrame.
  • For each column in the Z-Order list, a new column is added to the DataFrame which is a range ID expression which is just a marker to say that we need to calculate range Ids here. An extension class extends Catalyst to add the required logic to calculate range values. In other words, each column will have a range partition integer value without really repartitioning the data. This involves using reservoir sampling on top of the underlying Dataframe column. We cannot really use repartitionByRange for each column because it will not produce the required effect.
  • Now we have a set of new columns where each column has an integer value of the relevant range partition a certain row belongs to. This is a smart way of converting the values of all those columns of different data types to a group of columns with single integer data type. Effectively, the dimensions we need to Z-Order on now is a group of integer dimensions with consistent range of values. The number of ranges is controlled by spark.databricks.io.skipping.mdc.rangeId.max . The default value for this config is 1000 which is good for most cases.
  • Then a new binary column is added containing the bit-interleaving value of all the columns added in the previous step.
  • Finally, the delta table Dataframe is range partitioned using that binary column value (Z-Order of multiple dimensions) and written into the calculated number of files.

Enough theory, just put it into action!

https://unsplash.com/photos/xdBkTMLw8O8

Now let’s see what happens when the rubber hits the road.

First, an explicit non-default value will be set for the max file size of OPTIMIZE command. For demo purpose, I will not stick with the default of 1GB but will go for 160MB which is roughly the same file size of the original source table files. We are not really after compaction effect here, just the Z-Order effect.

Next, OPTIMIZE command is called doing a Z-Order operation on id column. That can be done in Python but it is easier to use SQL here.

The output of the command will show some metrics about the operation done which involves logically deleting the previous 8 files and adding 8 new files.

Image by author

The column-level statistics for id now shows similar pattern like our original range partitioning sample. Each file contains a disjoint range of Ids. By the way, there is a config value that tells what was the last delta lake commit in current session. It’s a neat shortcut to know the latest version of a delta table involved in the last transaction in session which is the optimize call in our case. The other option is to DESCRIBE HISTORY <table> and get the maximum version which usually appears in first row.

Image by author

Now if we run the same SQL query, there will be 1 file to read and 7 files are skipped due to data locality on disk plus the supporting column-level metadata.

Image by author

Practical considerations

  • Z-Ordering is geared towards columns with high cardinality. For a radical anti-pattern, you should not Z-Order by a low cardinality column like gender or marital status.
  • Effectiveness of the technique decreases with more columns added to Z-Order list.
  • Z-Order requires column statistics to be collected. If a column does not have statistics collected, then Z-Ordering is a waste of compute/time resources. In Delta, the first 32 columns have statistics collected and that’s configurable.
  • Because even high cardinality data can have skew, there is a config value spark.databricks.io.skipping.mdc.addNoise that can be used to add some randomness to the process to decrease the effect of the skew. That would slightly degrade the effectiveness of Z-Ordering though.

Closing notes

Z-Ordering is a tool like any tool, it will be very effective when used in the right situation. Remember, everything is a nail to a hammer. Assess the need first to apply Z-Ordering and how often to do that because it is requires shuffling and rewriting the whole table or at least a one or more partitions. Nevertheless, it will save lots of time and compute cost if there are lots of workloads involving high cardinality column filters.

It is also quite cool to understand how such features are implemented because you will come across several patterns or APIs that you never saw before. Z-Ordering touches on concepts like range partitioning, reservoir sampling and Spark session extensions.

Update May 2023:

Credit to the awesome post by Nick Karpov, I presented Z-Ordering at a data engineering meetup and visualised things using the same approach in his post. So it was the famous NYC taxi dataset and the data before Z-Ordering had all the files covering the whole geo-spatial area of NYC. After applying Z-Ordering, files were more spread spatially such that a single point on the map is covered by a very small number of files hence helping with data skipping.

NYC taxi data without Z-Ordering
NYC taxi data after Z-Ordering

Source code of the above can be found here.

Further Reading

--

--

Yousry Mohamed

Yousry is a lead consultant working for Cuusoo. He is very passionate about all things data including Big Data, Machine Learning and AI.