ASOF Join in Spark
Apache Spark has no native ASOF join operator. There is no Catalyst optimizer rule, no physical operator, and no SQL syntax for it. The Pandas-on-Spark ps.merge_asof() exists but is not distributed — it collects all data to a single partition, making it useless for large datasets.
This note covers why, what workarounds exist, and which one actually works at scale.
Prerequisites
- ASOF Join — what an ASOF join is and why it matters
- Apache Spark Query Planning and Execution — how Catalyst translates logical plans to physical operators
Why Spark Has No Native ASOF Join
Spark’s join infrastructure is built around equi-joins. The Catalyst optimizer (Spark’s query planner and optimizer) recognizes equality predicates in ON clauses and selects among hash join, sort-merge join, or broadcast join. Inequality predicates fall through to either:
- A Cartesian product followed by a filter (when no equality predicate exists)
- A nested loop join with the inequality as a post-filter
Neither path has any awareness of the “pick best match” semantics that ASOF requires. Adding native ASOF support would require:
- A new logical join type in Catalyst’s relational algebra
- A new physical operator implementing the sort-merge ASOF algorithm
- Optimizer rules to recognize the pattern and select the operator
SPARK-22947 proposed this as a SPIP (Spark Project Improvement Proposal — the formal process for proposing significant Spark changes) but was marked as duplicate without resolution. As of 2026, no native implementation exists.
The Naive Approaches (and Why They Fail)
1. Pandas-on-Spark merge_asof
import pyspark.pandas as ps
ps.merge_asof(observations, features, on="timestamp", by="entity_id")This calls pd.merge_asof() under the hood. All data is collected to the driver node into a single pandas DataFrame. For anything beyond toy datasets, this causes OOM (out-of-memory) errors.
2. Union + Sort + Lag
Union both tables, sort by (entity_id, timestamp), use lag() window function to carry forward the most recent feature row. This requires:
- A full shuffle to sort the unioned dataset
- Window function evaluation across the entire dataset
- High memory pressure from the sort
Works for moderate sizes but doesn’t scale gracefully.
3. Inequality Join + Window Dedup
SELECT * FROM (
SELECT obs.*, feat.feature_value,
ROW_NUMBER() OVER (
PARTITION BY obs.entity_id, obs.event_ts
ORDER BY feat.feature_ts DESC
) AS rn
FROM observations obs
JOIN features feat
ON obs.entity_id = feat.entity_id
AND obs.event_ts >= feat.feature_ts
) sub WHERE rn = 1;The inequality condition obs.event_ts >= feat.feature_ts produces an intermediate result that can be per entity — a Cartesian-like blowup within each partition. The window function then discards all but one row per group. For large feature stores with millions of feature versions per entity, this causes extreme disk spill or OOM.
See Why Standard SQL Can’t Express This Cleanly for why this pattern is fundamentally wasteful.
Sinchenko’s Decomposition (The Approach That Works)
From Semyon Sinchenko’s blog post “Effective asOfJoin in PySpark for Feature Store”. The key insight: decompose the ASOF join into three separate operations that Spark can each execute efficiently — even though Spark has no native ASOF operator.
The Idea: Find the Timestamp, Then Fetch the Row
The naive approach joins on the inequality (obs.event_ts >= feat.feature_ts) and then deduplicates with ROW_NUMBER(). The problem is the intermediate blowup — every observation matches every older feature row before the dedup throws most of them away.
Sinchenko’s decomposition avoids this by splitting the work into three CTEs (Common Table Expressions — named intermediate query results using WITH):
- CTE 1 — Find the best timestamp. Join observation keys with feature keys (lightweight — just two columns per side), filter by the inequality, and immediately aggregate with
MAXto collapse to one row per observation. The blowup from the inequality join is short-lived because theGROUP BY+MAXruns in the same query stage. - CTE 2 — Fetch the full feature row. Now that we know the exact
(entity_id, feature_ts)pair, do a standard equi-join against the features table to pick up all the feature columns. This is a normal hash join — Spark’s sweet spot. - Final query — Attach to observations. Left-join the enriched features back onto the observations table.
SQL Implementation
WITH best_feature_ts AS (
-- CTE 1: For each observation, find the most recent feature timestamp.
-- This is the only step with an inequality condition.
-- The GROUP BY + MAX immediately collapses the intermediate blowup
-- to one row per (entity_id, event_ts).
SELECT
obs.entity_id,
obs.event_ts,
MAX(feat.feature_ts) AS feature_ts
FROM observation_keys obs
JOIN feature_keys feat
ON obs.entity_id = feat.entity_id
AND feat.feature_ts <= obs.event_ts
GROUP BY obs.entity_id, obs.event_ts
),
matched_features AS (
-- CTE 2: Fetch the full feature row using an equi-join.
-- We know the exact (entity_id, feature_ts) pair, so this is
-- a standard hash/sort-merge join — no inequality, no blowup.
SELECT
b.entity_id,
b.event_ts,
f.*
FROM best_feature_ts b
JOIN features f
ON b.entity_id = f.entity_id
AND b.feature_ts = f.feature_ts
)
-- Final: left-join back to keep observations with no matching feature.
SELECT obs.*, mf.feature_value
FROM observations obs
LEFT JOIN matched_features mf
ON obs.entity_id = mf.entity_id
AND obs.event_ts = mf.event_ts;
observation_keysandfeature_keysin CTE 1In the SQL above,
observation_keysandfeature_keysare projections of the full tables onto just(entity_id, timestamp). You can either define them as additional CTEs or use subqueries. The point is to keep the inequality join lightweight by excluding payload columns — fewer bytes shuffled, smaller intermediate result.
PySpark Implementation
The same three-step logic expressed using the DataFrame API:
from pyspark.sql import DataFrame
import pyspark.sql.functions as F
def asof_join(
*,
observations: DataFrame,
features: DataFrame,
entity_col: str = "entity_id",
obs_ts_col: str = "event_ts",
feat_ts_col: str = "feature_ts",
) -> DataFrame:
# CTE 1 — find the best feature timestamp per observation (keys only)
obs_keys = observations.select(entity_col, obs_ts_col)
feat_keys = features.select(entity_col, feat_ts_col)
best_ts = (
obs_keys
.join(feat_keys, on=entity_col, how="left")
.filter(F.col(feat_ts_col) <= F.col(obs_ts_col))
.groupBy(entity_col, obs_ts_col)
.agg(F.max(feat_ts_col).alias(feat_ts_col))
)
# CTE 2 — fetch the full feature row (equi-join)
with_features = best_ts.join(
features, on=[entity_col, feat_ts_col], how="inner",
)
# Final — attach back to observations
return observations.join(
with_features, on=[entity_col, obs_ts_col], how="left",
)Why This Is Better Than the Naive Approach
Naive (inequality + ROW_NUMBER) | Sinchenko decomposition | |
|---|---|---|
| Intermediate size | — full rows | Same blowup, but on keys only (2 cols), collapsed immediately by MAX |
| Dedup mechanism | Window function over the bloated intermediate | GROUP BY + MAX — cheaper, no window sort |
| Feature fetch | Already materialized in the bloated join | Separate equi-join on exact (entity_id, ts) — small, targeted |
The decomposition doesn’t eliminate the inequality join, but it makes it cheap (keys only) and short-lived (MAX collapses it in the same stage). The expensive part — fetching full feature rows — becomes a precise equi-join instead of a Cartesian blowup.
Broadcast join only works when observations is small
In the PySpark version, you can wrap
obs_keyswithF.broadcast()to hint Spark to send it to every executor, avoiding a shuffle. This only works when the observation table fits in executor memory (the default broadcast threshold is 10MB, tunable viaspark.sql.autoBroadcastJoinThreshold). For large observation tables, omit the broadcast — Spark falls back to sort-merge join, which is slower but won’t OOM. The rest of the algorithm works identically either way.
Performance
Sinchenko reported ~3.5s for small feature stores and ~9s for large ones, scaling gracefully.
Databricks Range Join Hint
On Databricks (not open-source Spark), a range join optimization exists using bin bucketing (partitioning the value domain into equal-sized bins and only comparing rows in overlapping bins):
SELECT /*+ RANGE_JOIN(prices, 86400) */ *
FROM events e
JOIN prices p
ON e.ticker = p.ticker
AND e.ts BETWEEN p.ts AND p.ts + INTERVAL 1 DAY;The hint parameter (86400) is the bin size in the column’s unit (seconds here).
-- Alternative: session variable
SET spark.databricks.optimizer.rangeJoin.binSize = 3600; -- 1 hourThis is still a range join, not an ASOF join
The range join produces all matches within the range, not just the best one. You still need a dedup step (
ROW_NUMBER()or equivalent) on top. It reduces the Cartesian blowup by filtering non-overlapping bins, but doesn’t eliminate it.
Choosing bin size: Use APPROX_PERCENTILE on the range of timestamp differences to guide selection. Too small → overhead from too many bins. Too large → bins don’t filter enough.
ML Feature Store Context
The ASOF join is the core operation in ML feature stores for point-in-time correctness (also called temporal correctness). When building training datasets:
- Each observation has an
(entity_id, event_timestamp) - Features must reflect only information available at or before
event_timestamp - Using features from after
event_timestampis data leakage (the model sees the future during training) — producing inflated metrics that collapse in production
This is exactly the backward ASOF join semantic: for each observation, find the most recent feature version at or before the event time. Getting this wrong is one of the most common and hardest-to-detect bugs in ML pipelines.
See also
- ASOF Join — concept overview and system support matrix
- ASOF Join — Implementation Strategies — how native implementations avoid the problems Spark hits
- IEJoin Algorithm — the general inequality join algorithm; Spark’s community has discussed IEJoin as a path toward ASOF
- Apache Spark Query Planning and Execution — Catalyst optimizer internals