Introduction
a continuous variable for four different products. The machine learning pipeline was built in Databricks and there are two major components.
- Feature preparation in SQL with serverless compute.
- Inference on an ensemble of several hundred models using job clusters to have control over compute power.
In our first attempt, a 420-core cluster spent nearly 10 hours processing just 18 partitions.
The objective is to tune the data flow to maximize cluster usage and ensure scalability. Inference is done on four sets of ML models, one set per product. However, we will focus on how the data is saved as it will lay out how much parallelism we can leverage for inference. We will not focus on the inner workings of the inference itself.
If there are too few file partitions, the cluster will take a long time scanning large files and at that point, unless repartitioned (that means added network latency and data shuffling), you might be inferencing on a large set of rows in every partition too. Also resulting in long run times.
However, business has limited patience to ship out ML pipelines with a direct impact on the org. So tests are limited.
In this article, we will review our feature data landscape, then provide an overview of the ML inference, and present the results and discussions of the inference performance based on four dataset treatment scenarios:
- Partitioned table, no salt, no row limit in partitions (non-salted and Partitioned)
- Partitioned table, salted, with 1M row limit (salty and Partitioned)
- Liquid-clustered table, no salt, no row limit in partitions (non-salted and Liquid)
- Liquid-clustered table, salted, with 1M row limit (salty and liquid)
Data Landscape
The dataset contains features that the set of ML models uses for inference. It has ~550M rows and contains four products identified in the attribute ProductLine:
- Product A: ~10.45M (1.9%)
- Product B: ~4.4M (0.8%)
- Product C: ~100M (17.6%)
- Product D: ~354M (79.7%)
It then has another low cardinality attribute attrB, that contains only two distinct values and is used as a filter to extract subsets of the dataset for every part of the ML system.
Moreover, RunDate logs the date when the features were generated. They are append-only. Finally, the dataset is read using the following query:
SELECT
Id,
ProductLine,
AttrB,
AttrC,
RunDate,
{model_features}
FROM
catalog.schema.FeatureStore
WHERE
ProductLine = :product AND
AttrB = :attributeB AND
RunDate = :RunDate
Salt Implementation
The salting here is generated dynamically. Its purpose is to distribute the data according to the volumes. This means that large products receive more buckets and smaller products receive fewer buckets. For instance, Product D should receive around 80% of the buckets, given the proportions in the data landscape.
We do this so we can have predictable inference run times and maximize cluster utilization.
# Calculate percentage of each (ProductLine, AttrB) based on row counts
brand_cat_counts = df_demand_price_grid_load.groupBy(
"ProductLine", "AttrB"
).count()
total_count = df_demand_price_grid_load.count()
brand_cat_percents = brand_cat_counts.withColumn(
"percent", F.col("count") / F.lit(total_count)
)
# Collect percentages as dicts with string keys (this will later determine
# the number of salt buckets each product receives
brand_cat_percent_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['percent']
for row in brand_cat_percents.collect()
}
# Collect counts as dicts with string keys (this will help
# to add an additional bucket if counts is not divisible by the number of
# buckets for the product
brand_cat_count_dict = {
f"{row['ProductLine']}|{row['AttrB']}": row['count']
for row in brand_cat_percents.collect()
}
# Helper to flatten key-value pairs for create_map
def dict_to_map_expr(d):
expr = []
for k, v in d.items():
expr.append(F.lit(k))
expr.append(F.lit(v))
return expr
percent_case = F.create_map(*dict_to_map_expr(brand_cat_percent_dict))
count_case = F.create_map(*dict_to_map_expr(brand_cat_count_dict))
# Add string key column in pyspark
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"product_cat_key",
F.concat_ws("|", F.col("ProductLine"), F.col("AttrB"))
)
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"percent", percent_case.getItem(F.col("product_cat_key"))
).withColumn(
"product_count", count_case.getItem(F.col("product_cat_key"))
)
# Set min/max buckets
min_buckets = 10
max_buckets = 1160
# Calculate buckets per row based on (BrandName, price_delta_cat) percentage
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets_base",
(F.lit(min_buckets) + (F.col("percent") * (max_buckets - min_buckets))).cast("int")
)
# Add an extra bucket if brand_count is not divisible by buckets_base
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"buckets",
F.when(
(F.col("product_count") % F.col("buckets_base")) != 0,
F.col("buckets_base") + 1
).otherwise(F.col("buckets_base"))
)
# Generate salt per row based on (ProductLine, AttrB) bucket count
df_demand_price_grid_load = df_demand_price_grid_load.withColumn(
"salt",
(F.rand(seed=42) * F.col("buckets")).cast("int")
)
# Perform the repartition using the core attributes and the salt column
df_demand_price_grid_load = df_demand_price_grid_load.repartition(
1200, "AttrB", "ProductLine", "salt"
).drop("product_cat_key", "percent", "brand_count", "buckets_base", "buckets", "salt")
Finally, we save our dataset to the feature table and add a max number of rows per partition. This is to prevent Spark from generating partitions with too many rows, which it can do even if we have already computed the salt.
Why do we enforce 1M rows? The primary focus is on model inference time, not so much on file size. After a few tests with 1M, 1.5M, 2M, the first yields the best performance in our case. Again, very budget and time-constrained for this project, so we have to make the most of our resources.
df_demand_price_grid_load.write\
.mode("overwrite")\
.option("replaceWhere", f"RunDate = '{params['RunDate']}'")\
.option("maxRecordsPerFile", 1_000_000) \
.partitionBy("RunDate", "price_delta_cat", "BrandName") \
.saveAsTable(f"{params['catalog_revauto']}.{params['schema_revenueautomation']}.demand_features_price_grid")
Why not just rely on Spark’s Adaptive Query Execution (AQE)?
Recall that the primary focus is on inference times, not on measurements tuned for regular Spark SQL queries like file size. Using only AQE was actually our initial attempt. As you will see in the results, the run times were very undesirable and did not maximize the cluster utilization given our data proportions.
Machine Learning inference
There is a pipeline with 4 tasks, one per product. Every task does the following general steps:
- Loads the features from the corresponding product
- Loads the subset of ML models for the corresponding product
- Performs inference in half the subset sliced by
AttrB - Performs inference in the other half sliced by
AttrB - Saves data to the results table
We will focus on one of the inference stages to not overwhelm this article with numbers, although the other stage is very similar in structure and results. Moreover, you can see the DAG for the inference to evaluate in Fig. 2.

It seems very straightforward, but the run times can vary depending on how your data is saved and the size of your cluster.
Cluster configuration
For the inference stage we are analyzing, there is one cluster per product, tuned for the infrastructure limitations of the project, and also the distribution of data:
- Product A: 35 workers (Standard_DS14v2, 420 cores)
- Product B: 5 workers (Standard_DS14v2, 70 cores)
- Product C: 1 worker (Standard_DS14v2, 14 cores)
- Product D: 1 worker (Standard_DS14v2, 14 cores)
In addition, AdaptiveQueryExecution is enabled by default, which will let Spark decide how to best save the data given the context you provide.
Results and discussion
You will see for each scenario a depiction of the number of file partitions per product and the average number of rows per partition to give you an indication of how many rows the ML system will do inference per Spark task. Furthermore, we present Spark UI metrics to observe run-time performance and look for the distribution of data at inference time. We will do the Spark UI portion only for Product D, which is the largest, to not include an excess of information. In addition, depending on the scenario, inference on Product D becomes a bottleneck in run time. Another reason why it was the primary focus of the results.
Non-Salted and Partitioned
You can see in Fig. 3that the average file partition has tens of millions of rows, which means considerable run time for a single executor. The largest on average is Product C with more than 45M rows in a single partition. The smallest is Product B with approximately 12M average rows.

Fig 4. depict the number of partitions per product, with a total of 26 for all. Checking product D, 18 partitions fall very short of the 420 cores we have available and on average, every partition will perform inference on ~40M rows.

Take a look at Fig 5. In total, the cluster spent 9.9 hours and it still wasn’t complete, as we had to kill the job, for it was becoming expensive and blocking other people’s tests.

From the summary statistics in Fig. 6 for the tasks that did finish, we can see that there was heavy skew in the partitions for Product D. The maximum input size was ~56M and the runtime was 7.8h.

Non-salted and Liquid
In this scenario, we can observe very similar results in terms of average number of rows per file partition and number of partitions per product, as seen in Fig. 7 and Fig. 8, respectively.

Product D has 19 file partitions, still very short of 420 cores.

We can already anticipate that this experiment was going to be very expensive, so I decided to skip the inference test for this scenario. Again, in an ideal situation, we carry it forward, but there is a backlog of tickets in my board.
Salty and Partitioned
After applying the salting and repartition process, we end up with ~2.5M average records per partition for products A and B, and ~1M for products C and D as depicted in Fig 9.

Moreover, we can see in Fig. 10 that the number of file partitions increased to approximately 860 for product D, which gives 430 for each inference stage.

This results in a run time of 3h for inferencing Product D with 360 tasks as seen in Fig 11.

Checking the summary statistics from Fig. 12, the distribution looks balanced with run times around 1.7, but a maximum task taking 3h, which is worth further investigating in the future.

One great benefit is that the salt distributes the data according to the proportions of the products. If we had more availability of resources, we could increase the number of shuffle partitions in repartition() and add workers according to the proportions of the data. This ensures that our process scales predictably.
Salty and Liquid
This scenario combines the two strongest levers we have explored so far:
salting to control file size and parallelism, and liquid clustering to keep related data colocated without rigid partition boundaries.
After applying the same salting strategy and a 1M row limit per partition, the liquid-clustered table shows a very similar average partition size to the salted and partitioned case, as shown in Fig 13. Products C and D remain close to the 1M rows target, while products A and B settle slightly above that threshold.

However, the main difference appears in how these partitions are distributed and consumed by Spark. As shown in Fig. 14, product D again reaches a high number of file partitions, providing enough parallelism to saturate the available cores during inference.

Unlike the partitioned counterpart, liquid clustering allows Spark to adapt file layout over time while still benefiting from the salt. This results in a more even distribution of work across executors, with fewer extreme outliers in both input size and task duration.
From the summary statistics in Fig. 15, we observe that the majority of tasks are completed within a tight runtime window, and the maximum task duration is lower than in the salty and partitioned scenario. This indicates reduced skew and better load balancing across the cluster.


An important side effect is that liquid clustering preserves data locality for the filtered columns without enforcing strict partition boundaries. This allows Spark to still benefit from data skipping, while the salt ensures that no single executor is overwhelmed with tens of millions of rows.
Overall, salty and liquid emerges as the most robust setup: it maximizes parallelism, minimizes skew, and reduces operational risk when inference workloads grow or cluster configurations change.
Key Takeaways
- Inference scalability is often limited by data layout, not model complexity. Poorly sized file partitions can leave hundreds of cores idle while a few executors process tens of millions of rows.
- Partitioning alone is not enough for large-scale inference. Without controlling file size, partitioned tables can still produce massive partitions that lead to long-running, skewed tasks.
- Salting is an effective tool to unlock parallelism. Introducing a salt key and enforcing a row limit per partition dramatically increases the number of runnable tasks and stabilizes runtimes.
- Liquid clustering complements salting by reducing skew without rigid boundaries. It allows Spark to adapt file layout over time, making the system more resilient as data grows.