dbt tests vs Delta Live Tables expectations : a click bait to Spark observable metrics

Yousry Mohamed
Level Up Coding
Published in
7 min readAug 5, 2023

--

https://unsplash.com/photos/fPxOowbR6ls

Testing is an essential component of softtware applications and the same applies to data pipelines as well. There are standalone libraries like great expectations or Soda to help quality check your data. Other data processing applications like Databricks Delta Live Tables (DLT) and dbt have built in features to test the records flowing through the pipeline and persist the findings for later inspection or take acions like stopping the whole pipeline when bad data is found.

This post will compare DLT expectations and dbt tests on a very high level focussing on comparing how they work and performance implications. This is the click bait that will lead us to some cool feature in Spark 3.

dbt tests

Let’s start with dbt which has gained lots of traction in the data space. dbt test feature is basically SQL statements as everything else. There could be macros to refactor common functionality but after all everything boils down to SQL statements that run against your compute engine whatever it is. So, if we need to test that sales order amount is positive and has a not NULL customer id, then two SQL statements will be fired even though we are testing atributes of the same record. Let’s have a look.

Our dataset to test is the famous NYC taxi trips sampled to around 50M records. The tests we would like to apply are:

  • Passenger count ≥ 1
  • Trip distance ≤ 100 (km or miles, does not matter here 😁)
  • Fare amount ≥ 2.5 $
  • Tolls amount ≤ 20$
  • Vendor ID has to be CMT or VTS
  • Pickup longitude has to be between -180 and 180, we are not even interested to limit it to any USA region.

dbt has a few basic test rules for checking uniquness or not NULL but there are many community packages and we will use dbt-expectation. For simiplicity we won’t perist test results in a table and we would stick to warnning level. The tests above can be encoded as follows:

version: 2

models:
- name: trips
description: "NYC trips"
columns:
- name: passenger_count
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 1
- name: trip_distance
tests:
- dbt_expectations.expect_column_values_to_be_between:
max_value: 100
- name: fare_amount
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: 2.5
- name: tolls_amount
tests:
- dbt_expectations.expect_column_values_to_be_between:
max_value: 20
- name: vendor_id
tests:
- dbt_expectations.expect_column_values_to_be_in_set:
value_set: ["CMT", "VTS"]
- name: pickup_longitude
tests:
- dbt_expectations.expect_column_values_to_be_between:
min_value: -180
max_value: 180

Let’s run dbt build and see what we get on our compute backend.

For my sampled data, the output looks as follows which shows the trips model is built and all tests are fine except for the longitude test because the dataset has rubbish values.

Image by author

If you inspect SQL statements on your compute backend, you will see one statement per the model and one statement per each test case similar to the following.

/* {"app": "dbt", "dbt_version": "1.5.4", "dbt_databricks_version": "1.5.5", "databricks_sql_connector_version": "2.7.0", "profile_name": "user", "target_name": "default", "node_id": "test.my_new_project.dbt_expectations_expect_column_values_to_be_between_trips_tolls_amount__20.9849a0b6b7"} */
select
count(*) as failures,
count(*) != 0 as should_warn,
count(*) != 0 as should_error
from
(
with grouped_expression as (
select
(
1 = 1
and tolls_amount <= 20
) as expression
from
`dbt_silver`.`trips`
),
validation_errors as (
select
*
from
grouped_expression
where
not(expression = true)
)
select
*
from
validation_errors
) dbt_internal_test

Other than having one SQL statement issued per each test, some of those SQL statements might be written in a not-so-efficient manner for your specific compute backend. This is one of the trade-offs related to using code-generation approaches. There will be productivity gains but you need to weigh that against performance implications. The next screenshot shows the execution plan for the vendor ID test. Although we just need to check that a coulmn value should belong to one of two values, there is a join operation done and while it might not be super bad (due to broadcast hashjoin) but still too much effort for such type of test. The generated query produces a SQL-produced single column table with the vendor IDs to be included then joins that table with source trips table.

Image by author

DLT expectations

Databricks Delta Live Tables (DLT) follows a similar declaritive approach. The same tests in DLT can be encoded as follows.

CREATE OR REFRESH LIVE TABLE trips
(
CONSTRAINT valid_count EXPECT (passenger_count >= 1),
CONSTRAINT trip_distance EXPECT (trip_distance <= 100),
CONSTRAINT fare_amount EXPECT (fare_amount >= 2.5),
CONSTRAINT tolls_amount EXPECT (tolls_amount <= 20),
CONSTRAINT vendor_id EXPECT (vendor_id IN ('CMT', 'VTS')),
CONSTRAINT pickup_longitude EXPECT (pickup_longitude BETWEEN -180 AND 180)
)
COMMENT "Sample NYC taxi trips" AS
SELECT
*
FROM
bronze.trips
WHERE
passenger_count BETWEEN 1
AND 6;

Against the same source data used with the dbt build, DLT run would produce something like the following which shows number of records processed and one failure for pickup longitude. Similar to dbt, we are not failing the pipeline with bad data for simplicity.

Image by author

Now it is time to see how DLT runs the tests. Although there will be lots of noise in the DLT Spark cluster, but most of if it is operational aspects as DLT uses internal delta lake tables to record operational data. Nevertheless, there will be one Spark SQL query used to load the data and run the tests as well.

Image by author

As expected, source data is loaded from a delta table and there is some basic filtering done but this is not the cool thing here. Notice the CollectMetrics box at the end (top) of the execution plan. This is the secret sause that allows running the tests while loading the data. The expanded textual represnetation of execution plan shows that all the test logic we have written has been ported into the execution plan and it belongs to that CollectMetrics node.

Image by author

Although for complicated test logic, DLT will also need writing separate test code in Python or SQL but for the majority of cases CollectMetrics will do the job in a very efficient manner.

So what is that CollectMetrics thing ?

With the introduction of Spark 3.0, lots of cool features have been added and many of them are kinda auto-piloting Spark jobs with things like adaptive query execution. Other features are a bit not so shiny while they can be very useful in certain cases. One of such features is Observale Metrics. Basically, it resembeles things like accumulators and listeners but in a much more user friendly way.

The core idea is to define some aggregation that you want to compute part of your Spark action. In the case of DLT for example, it is the count of rows violating the expression per test case.

Let’s do a small example. Assume we have a dataset of 1M random numbers from 0 to 1. We want to verify that around 50% of such numbers are greater than 0.5.

from pyspark.sql import Observation
import pyspark.sql.functions as F

df = spark.sql("SELECT rand() AS value FROM RANGE(1000000)")
obs = Observation("how many records greater than 0.5")

cond = F.when(F.expr("value > 0.5"), F.lit(1)).otherwise(F.lit(0))
wrapped_df = df.observe(obs, F.sum(cond).alias("metric_value"))
wrapped_df.count()

obs.get["metric_value"]

The cond variable defines an expression to be evaluated and based on the evaluation we can produce a 1 or a 0 for example and that would be per each row. wrapped_df variable takes a DataFrame and calls observe on the DataFrame. The second parameter of that call has to be an aggrgation expression which in our case simply counts how many records satisfy the expression. In DTL, the expressions are negated to count how many records violate the test. After a Spark action is executed against the wrapped Dataframe object, whatever metrics you defined can be extracted from the observation object.

Image by author

The execution plan of the above cell contains the below entry which is just a translation of the metric we want to calculate.

(9) CollectMetrics
Input [1]: [value#2]
Arguments: how many records greater than 0.5, [sum(CASE WHEN (value#2 > 0.5) THEN 1 ELSE 0 END) AS metric_value#6L]

Observable metrics feature is super useful with cases like testing the data or profiling it while the pipeline runs and not having to reload the data and do such checks explicitly. It is something that you get “nearly” for free while a Spark action is executed.

I have seen some Spark pipelines writing a DataFrame to disk and then issuing a df.count() afterwords to compare number of records in the source DataFrame to the number of records written to target. Due to the lazy evaluation nature of Spark, that extra count call could be expensive. An observable metric should be useful in such cases.

Observable metrics feature is resilient to task retry due to transient failures like network issues or a spot instance taken away from cluster. The below screenshots show a simulated temporary (i.e. recoverable) failure and how the observable metric still reports right value which is a simple record count even when the task is retried.

Further reading

--

--

Yousry is a lead consultant working for Cuusoo. He is very passionate about all things data including Big Data, Machine Learning and AI.