IndexTables Internals

This note covers the internal architecture of IndexTables for Spark — how it hooks into Spark’s DataSource V2 API, how data flows through the write and read paths, and how SQL predicates become native Tantivy queries.

Prerequisites

Package anatomy

IndexTables is a single Maven module (io.indextables:indextables_spark) built with Scala 2.12 on Spark 3.5. The source tree under src/main/scala/io/indextables/spark/ is organized by concern:

spark/
  core/           # DataSource V2: TableProvider, Table, ScanBuilder, Scan, Writers, Readers
  catalog/        # Catalog plugin (CatalogExtension, table resolution)
  extensions/     # SparkSessionExtensions registration (parser, rules, functions)
  catalyst/       # Catalyst rules for IndexQuery and bucket expression pushdown
  expressions/    # Custom Catalyst expressions (IndexQueryExpression, bucket expressions)
  filters/        # Filter tree types (IndexQueryFilter, MixedBooleanFilter)
  sql/            # ANTLR grammar + AST builder for custom SQL commands
  transaction/    # Transaction log (Actions, NativeTransactionLog via JNI)
  search/         # SplitSearchEngine wrapper around tantivy4java's SplitSearcher
  arrow/          # Arrow FFI bridges (read: import batches, write: export batches)
  schema/         # Spark ↔ Tantivy type mapping
  io/             # Cloud storage providers (S3, Azure, Hadoop), merge downloaders
  storage/        # Split cache management, locality manager
  sync/           # Companion mode (Delta/Iceberg/Parquet → IndexTables sync)
  merge/          # Merge-on-write (async background split merging)
  purge/          # Purge-on-write (auto-cleanup of tombstoned files)
  prewarm/        # Cache prewarming subsystem
  config/         # Configuration classes and SQL conf registration
  metrics/        # Spark UI metrics for data skipping and read pipeline
  stats/          # Min/max statistics for data skipping
  json/           # JSON field predicate translation
  write/          # Write-side config (Arrow FFI config, optimized writes)
  memory/         # Native memory initialization

Key external dependencies:

DependencyRole
tantivy4java (0.34.4)JNI bridge to the Rust Tantivy search engine — provides SplitSearcher, QuickwitSplit, transaction log, merge operations
Arrow C Data (arrow-c-data)Arrow C data interface for zero-copy batch exchange between JVM and Rust
ANTLR (4.9.3)Grammar-based parser for custom SQL commands
AWS SDK v2 / Azure SDKCloud storage I/O for split upload/download

How Spark loads IndexTables

IndexTables registers itself through three Spark extension points:

  1. TableProviderIndexTables4SparkTableProvider implements DataSourceRegister with shortName "indextables", so spark.read.format("indextables") resolves to it
  2. SparkSessionExtensionsIndexTables4SparkExtensions injects a custom SQL parser, two Catalyst rules, and custom functions (configured via spark.sql.extensions)
  3. CatalogExtensionIndexTables4SparkCatalog wraps the default session catalog to resolve IndexTables tables by name

Write path

The write path from DataFrame to split file. Yellow nodes run in Rust via JNI; blue nodes are JVM-side Spark code.

Each executor writes independently. The writer buffers InternalRow objects into Arrow record batches, then exports them to Rust via the Arrow C data interface — no serialization, no copy. On the Rust side, QuickwitSplit.addArrowBatch() feeds the Arrow data into Tantivy’s indexing pipeline, producing a .split file. For cloud storage, splits are uploaded via multipart upload (S3) or block blob (Azure).

On commit, the driver collects AddAction messages from all executors and writes them to the transaction log in a single atomic version. The collection mechanism is Spark’s standard task result path: each executor’s DataWriter.commit() returns a WriterCommitMessage (a serializable object carrying the AddAction payload — min/max statistics, footer offsets, split UUID, document mapping). Spark serializes these messages back to the driver as part of the task result. The driver’s BatchWrite.commit(messages: Array[WriterCommitMessage]) receives all of them in one call, extracts the AddAction objects, and appends them atomically to the transaction log as a new version.

Read path

The read path from query to results. The driver negotiates pushdown and plans partitions; each executor runs a Tantivy query and streams Arrow batches zero-copy into Spark’s columnar format.

The scan builder implements five pushdown traits:

TraitWhat it pushes down
SupportsPushDownFiltersV1 Spark filters (EqualTo, GreaterThan, etc.)
SupportsPushDownV2FiltersV2 filter expressions
SupportsPushDownRequiredColumnsColumn pruning
SupportsPushDownLimitRow limit
SupportsPushDownAggregatesCOUNT, SUM, AVG, MIN, MAX

The critical optimization is in listFilesWithMetadata() — a single JNI call that evaluates min/max statistics in Rust and returns only the splits that could contain matching data. This is equivalent to Iceberg’s manifest filtering but runs entirely in native code.

Filter-to-query conversion

FiltersToQueryConverter translates Spark filter predicates into Tantivy SplitQuery objects:

Spark filterTantivy query
EqualTo(col, value)Term query
GreaterThan / LessThan / BetweenRange query (automatically combines paired filters on the same field)
StringStartsWith(col, prefix)Prefix query
StringContains(col, substr)Regex query
IsNull / IsNotNullExistence query
IndexQueryFilter(col, queryString)Direct Tantivy query syntax passthrough
And / Or / NotBoolean query (must/should/must_not clauses)

The converter preserves the boolean structure of compound predicates. A WHERE clause like status = 'error' AND message indexquery 'timeout OR connection' becomes a Tantivy boolean query with two must-clauses: a term query on status and a parsed query string on message.

IndexQuery pushdown through Catalyst

The indexquery operator is IndexTables’ most distinctive feature — it lets users write Tantivy query syntax directly in SQL. Getting it from the SQL string to the executor involves four stages:

Stage 1 — Parser preprocessing. The custom SQL parser intercepts WHERE col indexquery 'query' and rewrites it to tantivy4spark_indexquery(col, 'query'), which Spark sees as a regular function call.

Stage 2 — Resolution rule. V2IndexQueryExpressionRule (injected as a Catalyst resolution rule) finds IndexQueryExpression nodes in the logical plan. It resolves column references through SubqueryAlias and CTE chains to find the underlying DataSourceV2Relation.

Stage 3 — Optimizer rule. The same rule also runs as an optimizer rule. It converts the IndexQueryExpression into a MixedBooleanFilter tree — a custom filter type that can carry both regular Spark predicates and IndexQuery predicates in a single boolean structure. The filter tree is stored in a ThreadLocal keyed by the DataSourceV2Relation, because Spark’s pushdown API does not natively support custom filter types.

Stage 4 — Scan builder retrieval. When Spark calls pushFilters() on the IndexTables4SparkScanBuilder, it retrieves the MixedBooleanFilter from the ThreadLocal and merges it with the standard Spark filters. Both are converted to Tantivy queries and sent to the executor.

ThreadLocal coupling

The ThreadLocal handoff between the Catalyst rule and the ScanBuilder is a pragmatic workaround for the fact that Spark’s V2 filter pushdown API only supports a fixed set of filter types. It works because Catalyst rule execution and ScanBuilder creation happen on the same driver thread within a single query lifecycle.

How extensions share state at query time

The extension framework note explains the general problem: Spark’s three extension surfaces (SparkSessionExtensions, DataSource V2, Catalog) are registered independently and never hold references to each other. Configuration flows through SparkSession.conf, but query-time state — like the MixedBooleanFilter tree from the IndexQuery pushdown stages above — needs a side channel because Spark’s V2 pushdown API only supports a fixed set of filter types.

IndexTables solves this with the companion object pattern — a ThreadLocal + WeakHashMap (a hash map whose keys are held by weak references — entries are automatically removed when the key object is garbage-collected, preventing memory leaks from stale query plans) on the ScanBuilder’s companion object:

object IndexTables4SparkScanBuilder {
  // ThreadLocal: tracks which DataSourceV2Relation is being planned
  private val currentRelation = new ThreadLocal[Option[AnyRef]]
 
  // WeakHashMap: stores custom filter data keyed by the relation object
  private val customFilters = new WeakHashMap[AnyRef, MixedBooleanFilter]
 
  def setCurrentRelation(rel: AnyRef): Unit = currentRelation.set(Some(rel))
  def storeFilters(rel: AnyRef, filter: MixedBooleanFilter): Unit =
    customFilters.put(rel, filter)
  def getFilters(rel: AnyRef): Option[MixedBooleanFilter] =
    Option(customFilters.get(rel))
}

The flow connects the four IndexQuery pushdown stages to the scan builder:

  1. Catalyst rule writes. V2IndexQueryExpressionRule (Stage 3 above) extracts the MixedBooleanFilter tree and stores it in the companion object’s WeakHashMap, keyed by the DataSourceV2Relation node.

  2. ScanBuilder reads. When Spark calls pushFilters() on the ScanBuilder (Stage 4), it retrieves the MixedBooleanFilter using the current relation as the key and merges it with the standard V2 filters.

  3. ThreadLocal provides isolation. Catalyst analysis, optimization, and scan planning all happen on the same driver thread for a given query. The ThreadLocal ensures that concurrent queries on different threads don’t see each other’s filter data. This is isolation, not synchronization — there is no contention because each query’s planning runs single-threaded on the driver.

The dual-injection pattern

IndexTables injects V2IndexQueryExpressionRule as both a resolution rule and an optimizer rule. Why both?

Spark’s analyzer applies rules repeatedly until the plan stops changing — this is called fixed-point resolution. During this process, the DataSourceV2Relation node that the resolution rule stored filter data against may be replaced by analysis (Spark may re-resolve it, enrich it with metadata, or reconstruct it). If the key object in the WeakHashMap no longer matches what the ScanBuilder eventually sees, the lookup fails.

The optimizer rule is the safety net: it runs after analysis is complete and the plan is stable. It re-stores the filter data keyed by the final DataSourceV2Relation — the one that will actually be passed to newScanBuilder(). This guarantees the ScanBuilder finds the data regardless of what analysis did to the plan in between.

The exact reason the original key stops working (identity mismatch? structural inequality from added metadata? GC collecting the unreferenced old object?) depends on Spark’s internal analyzer implementation and may vary across versions. The dual-injection pattern is defensive — it doesn’t rely on understanding exactly when or why the key breaks.

Why WeakHashMap (not HashMap)

The keys are DataSourceV2Relation plan nodes — objects created by Spark’s query planner that live only for the duration of query planning. Once the query finishes and the plan is discarded, the DataSourceV2Relation node has no more strong references. The WeakHashMap entry is automatically cleaned up at the next GC cycle (via the ReferenceQueue mechanism), preventing filter data from accumulating across queries.

A regular HashMap would require explicit cleanup (remove the entry after the scan completes), which is fragile — if the query fails or is cancelled, the cleanup code might not run. With WeakHashMap, the GC handles cleanup for free.

What state flows where

StateMechanismExample
Extension config (key/value)SparkSession.confspark.indextables.read.mode = fast
Cloud credentialsHadoop configuration via SparkSessionAWS region, S3 endpoints
Custom filter data (query-time)Companion object ThreadLocal + WeakHashMapMixedBooleanFilter carrying IndexQuery expressions
Table identity and schemaTable object returned by TableProvider or CatalogBoth paths return the same IndexTables4SparkTable class
Source table metadata (catalog path only)CatalogManager → source catalog’s TBLPROPERTIESCompanion index storage path

Arrow FFI bridge

Data crosses the JVM ↔ Rust boundary via the Arrow C data interface — the same zero-copy mechanism used by DuckDB, Polars, and DataFusion for cross-language interop.

Read side (ArrowFfiBridge): The Rust search engine produces Arrow record batches and exports them through ArrowSchema and ArrowArray C structs. The Java side imports these structs and wraps the Arrow memory as a Spark ColumnarBatch. No deserialization occurs — Spark reads directly from the memory allocated by Rust.

Write side (ArrowFfiWriteBridge): The Java writer builds Arrow record batches from Spark’s InternalRow objects, exports them as C structs, and the Rust side reads them directly. The Tantivy indexer consumes Arrow batches natively, so the entire write path from Spark row to indexed document involves zero serialization.

This is the same interface described in Arrow C data interface, using ArrowSchema (format string + child schemas) and ArrowArray (buffer pointers + lengths + null counts).

Merge and purge lifecycle

MERGE SPLITS

Analogous to Delta Lake’s OPTIMIZE or Iceberg’s rewrite-data-files. Over time, many small splits accumulate (one per write batch per executor). MERGE SPLITS compacts them:

  1. The driver reads the transaction log and groups live splits by partition
  2. Within each partition, splits below the target size (default 5 GB) are grouped for merging
  3. Each executor calls QuickwitSplit.mergeSplits() — a Rust operation that merges multiple Tantivy indexes into one
  4. The driver commits atomically: RemoveAction for old splits + AddAction for merged splits

An async merge-on-write mode (AsyncMergeOnWriteManager) runs merges in the background after writes, keeping split count bounded without manual intervention.

PURGE INDEXTABLE

RemoveAction is a tombstone — the physical .split file remains on storage until purged. PURGE deletes split files whose tombstones are older than a configurable retention period. This is the same pattern as Delta Lake’s VACUUM.

PREWARM CACHE

For latency-sensitive workloads, PREWARM downloads split files (or just their footers) to the local NVMe cache before queries arrive. The GlobalSplitCacheManager manages the L1 (memory) and L2 (disk) caches, with LRU eviction and configurable size limits.

See also