ASOF Join in Spark

Apache Spark has no native ASOF join operator. There is no Catalyst optimizer rule, no physical operator, and no SQL syntax for it. The Pandas-on-Spark ps.merge_asof() exists but is not distributed — it collects all data to a single partition, making it useless for large datasets.

This note covers why, what workarounds exist, and which one actually works at scale.

Prerequisites

Why Spark Has No Native ASOF Join

Spark’s join infrastructure is built around equi-joins. The Catalyst optimizer (Spark’s query planner and optimizer) recognizes equality predicates in ON clauses and selects among hash join, sort-merge join, or broadcast join. Inequality predicates fall through to either:

  • A Cartesian product followed by a filter (when no equality predicate exists)
  • A nested loop join with the inequality as a post-filter

Neither path has any awareness of the “pick best match” semantics that ASOF requires. Adding native ASOF support would require:

  1. A new logical join type in Catalyst’s relational algebra
  2. A new physical operator implementing the sort-merge ASOF algorithm
  3. Optimizer rules to recognize the pattern and select the operator

SPARK-22947 proposed this as a SPIP (Spark Project Improvement Proposal — the formal process for proposing significant Spark changes) but was marked as duplicate without resolution. As of 2026, no native implementation exists.

The Naive Approaches (and Why They Fail)

1. Pandas-on-Spark merge_asof

import pyspark.pandas as ps
ps.merge_asof(observations, features, on="timestamp", by="entity_id")

This calls pd.merge_asof() under the hood. All data is collected to the driver node into a single pandas DataFrame. For anything beyond toy datasets, this causes OOM (out-of-memory) errors.

2. Union + Sort + Lag

Union both tables, sort by (entity_id, timestamp), use lag() window function to carry forward the most recent feature row. This requires:

  • A full shuffle to sort the unioned dataset
  • Window function evaluation across the entire dataset
  • High memory pressure from the sort

Works for moderate sizes but doesn’t scale gracefully.

3. Inequality Join + Window Dedup

SELECT * FROM (
  SELECT obs.*, feat.feature_value,
    ROW_NUMBER() OVER (
      PARTITION BY obs.entity_id, obs.event_ts
      ORDER BY feat.feature_ts DESC
    ) AS rn
  FROM observations obs
  JOIN features feat
    ON obs.entity_id = feat.entity_id
    AND obs.event_ts >= feat.feature_ts
) sub WHERE rn = 1;

The inequality condition obs.event_ts >= feat.feature_ts produces an intermediate result that can be per entity — a Cartesian-like blowup within each partition. The window function then discards all but one row per group. For large feature stores with millions of feature versions per entity, this causes extreme disk spill or OOM.

See Why Standard SQL Can’t Express This Cleanly for why this pattern is fundamentally wasteful.

Sinchenko’s Decomposition (The Approach That Works)

From Semyon Sinchenko’s blog post “Effective asOfJoin in PySpark for Feature Store”. The key insight: decompose the ASOF join into three separate operations that Spark can each execute efficiently — even though Spark has no native ASOF operator.

The Idea: Find the Timestamp, Then Fetch the Row

The naive approach joins on the inequality (obs.event_ts >= feat.feature_ts) and then deduplicates with ROW_NUMBER(). The problem is the intermediate blowup — every observation matches every older feature row before the dedup throws most of them away.

Sinchenko’s decomposition avoids this by splitting the work into three CTEs (Common Table Expressions — named intermediate query results using WITH):

  1. CTE 1 — Find the best timestamp. Join observation keys with feature keys (lightweight — just two columns per side), filter by the inequality, and immediately aggregate with MAX to collapse to one row per observation. The blowup from the inequality join is short-lived because the GROUP BY + MAX runs in the same query stage.
  2. CTE 2 — Fetch the full feature row. Now that we know the exact (entity_id, feature_ts) pair, do a standard equi-join against the features table to pick up all the feature columns. This is a normal hash join — Spark’s sweet spot.
  3. Final query — Attach to observations. Left-join the enriched features back onto the observations table.

SQL Implementation

WITH best_feature_ts AS (
    -- CTE 1: For each observation, find the most recent feature timestamp.
    -- This is the only step with an inequality condition.
    -- The GROUP BY + MAX immediately collapses the intermediate blowup
    -- to one row per (entity_id, event_ts).
    SELECT
        obs.entity_id,
        obs.event_ts,
        MAX(feat.feature_ts) AS feature_ts
    FROM observation_keys obs
    JOIN feature_keys feat
        ON obs.entity_id = feat.entity_id
        AND feat.feature_ts <= obs.event_ts
    GROUP BY obs.entity_id, obs.event_ts
),
 
matched_features AS (
    -- CTE 2: Fetch the full feature row using an equi-join.
    -- We know the exact (entity_id, feature_ts) pair, so this is
    -- a standard hash/sort-merge join — no inequality, no blowup.
    SELECT
        b.entity_id,
        b.event_ts,
        f.*
    FROM best_feature_ts b
    JOIN features f
        ON b.entity_id = f.entity_id
        AND b.feature_ts = f.feature_ts
)
 
-- Final: left-join back to keep observations with no matching feature.
SELECT obs.*, mf.feature_value
FROM observations obs
LEFT JOIN matched_features mf
    ON obs.entity_id = mf.entity_id
    AND obs.event_ts = mf.event_ts;

observation_keys and feature_keys in CTE 1

In the SQL above, observation_keys and feature_keys are projections of the full tables onto just (entity_id, timestamp). You can either define them as additional CTEs or use subqueries. The point is to keep the inequality join lightweight by excluding payload columns — fewer bytes shuffled, smaller intermediate result.

PySpark Implementation

The same three-step logic expressed using the DataFrame API:

from pyspark.sql import DataFrame
import pyspark.sql.functions as F
 
def asof_join(
    *,
    observations: DataFrame,
    features: DataFrame,
    entity_col: str = "entity_id",
    obs_ts_col: str = "event_ts",
    feat_ts_col: str = "feature_ts",
) -> DataFrame:
    # CTE 1 — find the best feature timestamp per observation (keys only)
    obs_keys = observations.select(entity_col, obs_ts_col)
    feat_keys = features.select(entity_col, feat_ts_col)
 
    best_ts = (
        obs_keys
        .join(feat_keys, on=entity_col, how="left")
        .filter(F.col(feat_ts_col) <= F.col(obs_ts_col))
        .groupBy(entity_col, obs_ts_col)
        .agg(F.max(feat_ts_col).alias(feat_ts_col))
    )
 
    # CTE 2 — fetch the full feature row (equi-join)
    with_features = best_ts.join(
        features, on=[entity_col, feat_ts_col], how="inner",
    )
 
    # Final — attach back to observations
    return observations.join(
        with_features, on=[entity_col, obs_ts_col], how="left",
    )

Why This Is Better Than the Naive Approach

Naive (inequality + ROW_NUMBER)Sinchenko decomposition
Intermediate size — full rowsSame blowup, but on keys only (2 cols), collapsed immediately by MAX
Dedup mechanismWindow function over the bloated intermediateGROUP BY + MAX — cheaper, no window sort
Feature fetchAlready materialized in the bloated joinSeparate equi-join on exact (entity_id, ts) — small, targeted

The decomposition doesn’t eliminate the inequality join, but it makes it cheap (keys only) and short-lived (MAX collapses it in the same stage). The expensive part — fetching full feature rows — becomes a precise equi-join instead of a Cartesian blowup.

Broadcast join only works when observations is small

In the PySpark version, you can wrap obs_keys with F.broadcast() to hint Spark to send it to every executor, avoiding a shuffle. This only works when the observation table fits in executor memory (the default broadcast threshold is 10MB, tunable via spark.sql.autoBroadcastJoinThreshold). For large observation tables, omit the broadcast — Spark falls back to sort-merge join, which is slower but won’t OOM. The rest of the algorithm works identically either way.

Performance

Sinchenko reported ~3.5s for small feature stores and ~9s for large ones, scaling gracefully.

Databricks Range Join Hint

On Databricks (not open-source Spark), a range join optimization exists using bin bucketing (partitioning the value domain into equal-sized bins and only comparing rows in overlapping bins):

SELECT /*+ RANGE_JOIN(prices, 86400) */ *
FROM events e
JOIN prices p
  ON e.ticker = p.ticker
  AND e.ts BETWEEN p.ts AND p.ts + INTERVAL 1 DAY;

The hint parameter (86400) is the bin size in the column’s unit (seconds here).

-- Alternative: session variable
SET spark.databricks.optimizer.rangeJoin.binSize = 3600;  -- 1 hour

This is still a range join, not an ASOF join

The range join produces all matches within the range, not just the best one. You still need a dedup step (ROW_NUMBER() or equivalent) on top. It reduces the Cartesian blowup by filtering non-overlapping bins, but doesn’t eliminate it.

Choosing bin size: Use APPROX_PERCENTILE on the range of timestamp differences to guide selection. Too small → overhead from too many bins. Too large → bins don’t filter enough.

ML Feature Store Context

The ASOF join is the core operation in ML feature stores for point-in-time correctness (also called temporal correctness). When building training datasets:

  • Each observation has an (entity_id, event_timestamp)
  • Features must reflect only information available at or before event_timestamp
  • Using features from after event_timestamp is data leakage (the model sees the future during training) — producing inflated metrics that collapse in production

This is exactly the backward ASOF join semantic: for each observation, find the most recent feature version at or before the event time. Getting this wrong is one of the most common and hardest-to-detect bugs in ML pipelines.

See also