IndexTables Internals
This note covers the internal architecture of IndexTables for Spark — how it hooks into Spark’s DataSource V2 API, how data flows through the write and read paths, and how SQL predicates become native Tantivy queries.
Prerequisites
- IndexTables for Spark — what IndexTables is, core concepts (splits, transaction log, field types)
- Apache Spark Query Planning and Execution — logical/physical plans, Catalyst optimizer, predicate pushdown
- Spark SQL Extension Framework — parser injection, resolution rules, optimizer rules
- Arrow C data interface — Arrow C Data Interface for zero-copy FFI
Package anatomy
IndexTables is a single Maven module (io.indextables:indextables_spark) built with Scala 2.12 on Spark 3.5. The source tree under src/main/scala/io/indextables/spark/ is organized by concern:
spark/
core/ # DataSource V2: TableProvider, Table, ScanBuilder, Scan, Writers, Readers
catalog/ # Catalog plugin (CatalogExtension, table resolution)
extensions/ # SparkSessionExtensions registration (parser, rules, functions)
catalyst/ # Catalyst rules for IndexQuery and bucket expression pushdown
expressions/ # Custom Catalyst expressions (IndexQueryExpression, bucket expressions)
filters/ # Filter tree types (IndexQueryFilter, MixedBooleanFilter)
sql/ # ANTLR grammar + AST builder for custom SQL commands
transaction/ # Transaction log (Actions, NativeTransactionLog via JNI)
search/ # SplitSearchEngine wrapper around tantivy4java's SplitSearcher
arrow/ # Arrow FFI bridges (read: import batches, write: export batches)
schema/ # Spark ↔ Tantivy type mapping
io/ # Cloud storage providers (S3, Azure, Hadoop), merge downloaders
storage/ # Split cache management, locality manager
sync/ # Companion mode (Delta/Iceberg/Parquet → IndexTables sync)
merge/ # Merge-on-write (async background split merging)
purge/ # Purge-on-write (auto-cleanup of tombstoned files)
prewarm/ # Cache prewarming subsystem
config/ # Configuration classes and SQL conf registration
metrics/ # Spark UI metrics for data skipping and read pipeline
stats/ # Min/max statistics for data skipping
json/ # JSON field predicate translation
write/ # Write-side config (Arrow FFI config, optimized writes)
memory/ # Native memory initialization
Key external dependencies:
| Dependency | Role |
|---|---|
| tantivy4java (0.34.4) | JNI bridge to the Rust Tantivy search engine — provides SplitSearcher, QuickwitSplit, transaction log, merge operations |
Arrow C Data (arrow-c-data) | Arrow C data interface for zero-copy batch exchange between JVM and Rust |
| ANTLR (4.9.3) | Grammar-based parser for custom SQL commands |
| AWS SDK v2 / Azure SDK | Cloud storage I/O for split upload/download |
How Spark loads IndexTables
IndexTables registers itself through three Spark extension points:
TableProvider—IndexTables4SparkTableProviderimplementsDataSourceRegisterwith shortName"indextables", sospark.read.format("indextables")resolves to itSparkSessionExtensions—IndexTables4SparkExtensionsinjects a custom SQL parser, two Catalyst rules, and custom functions (configured viaspark.sql.extensions)CatalogExtension—IndexTables4SparkCatalogwraps the default session catalog to resolve IndexTables tables by name
Write path
The write path from DataFrame to split file. Yellow nodes run in Rust via JNI; blue nodes are JVM-side Spark code.
Each executor writes independently. The writer buffers InternalRow objects into Arrow record batches, then exports them to Rust via the Arrow C data interface — no serialization, no copy. On the Rust side, QuickwitSplit.addArrowBatch() feeds the Arrow data into Tantivy’s indexing pipeline, producing a .split file. For cloud storage, splits are uploaded via multipart upload (S3) or block blob (Azure).
On commit, the driver collects AddAction messages from all executors and writes them to the transaction log in a single atomic version. The collection mechanism is Spark’s standard task result path: each executor’s DataWriter.commit() returns a WriterCommitMessage (a serializable object carrying the AddAction payload — min/max statistics, footer offsets, split UUID, document mapping). Spark serializes these messages back to the driver as part of the task result. The driver’s BatchWrite.commit(messages: Array[WriterCommitMessage]) receives all of them in one call, extracts the AddAction objects, and appends them atomically to the transaction log as a new version.
Read path
The read path from query to results. The driver negotiates pushdown and plans partitions; each executor runs a Tantivy query and streams Arrow batches zero-copy into Spark’s columnar format.
The scan builder implements five pushdown traits:
| Trait | What it pushes down |
|---|---|
SupportsPushDownFilters | V1 Spark filters (EqualTo, GreaterThan, etc.) |
SupportsPushDownV2Filters | V2 filter expressions |
SupportsPushDownRequiredColumns | Column pruning |
SupportsPushDownLimit | Row limit |
SupportsPushDownAggregates | COUNT, SUM, AVG, MIN, MAX |
The critical optimization is in listFilesWithMetadata() — a single JNI call that evaluates min/max statistics in Rust and returns only the splits that could contain matching data. This is equivalent to Iceberg’s manifest filtering but runs entirely in native code.
Filter-to-query conversion
FiltersToQueryConverter translates Spark filter predicates into Tantivy SplitQuery objects:
| Spark filter | Tantivy query |
|---|---|
EqualTo(col, value) | Term query |
GreaterThan / LessThan / Between | Range query (automatically combines paired filters on the same field) |
StringStartsWith(col, prefix) | Prefix query |
StringContains(col, substr) | Regex query |
IsNull / IsNotNull | Existence query |
IndexQueryFilter(col, queryString) | Direct Tantivy query syntax passthrough |
And / Or / Not | Boolean query (must/should/must_not clauses) |
The converter preserves the boolean structure of compound predicates. A WHERE clause like status = 'error' AND message indexquery 'timeout OR connection' becomes a Tantivy boolean query with two must-clauses: a term query on status and a parsed query string on message.
IndexQuery pushdown through Catalyst
The indexquery operator is IndexTables’ most distinctive feature — it lets users write Tantivy query syntax directly in SQL. Getting it from the SQL string to the executor involves four stages:
Stage 1 — Parser preprocessing. The custom SQL parser intercepts WHERE col indexquery 'query' and rewrites it to tantivy4spark_indexquery(col, 'query'), which Spark sees as a regular function call.
Stage 2 — Resolution rule. V2IndexQueryExpressionRule (injected as a Catalyst resolution rule) finds IndexQueryExpression nodes in the logical plan. It resolves column references through SubqueryAlias and CTE chains to find the underlying DataSourceV2Relation.
Stage 3 — Optimizer rule. The same rule also runs as an optimizer rule. It converts the IndexQueryExpression into a MixedBooleanFilter tree — a custom filter type that can carry both regular Spark predicates and IndexQuery predicates in a single boolean structure. The filter tree is stored in a ThreadLocal keyed by the DataSourceV2Relation, because Spark’s pushdown API does not natively support custom filter types.
Stage 4 — Scan builder retrieval. When Spark calls pushFilters() on the IndexTables4SparkScanBuilder, it retrieves the MixedBooleanFilter from the ThreadLocal and merges it with the standard Spark filters. Both are converted to Tantivy queries and sent to the executor.
ThreadLocal coupling
The ThreadLocal handoff between the Catalyst rule and the ScanBuilder is a pragmatic workaround for the fact that Spark’s V2 filter pushdown API only supports a fixed set of filter types. It works because Catalyst rule execution and ScanBuilder creation happen on the same driver thread within a single query lifecycle.
How extensions share state at query time
The extension framework note explains the general problem: Spark’s three extension surfaces (SparkSessionExtensions, DataSource V2, Catalog) are registered independently and never hold references to each other. Configuration flows through SparkSession.conf, but query-time state — like the MixedBooleanFilter tree from the IndexQuery pushdown stages above — needs a side channel because Spark’s V2 pushdown API only supports a fixed set of filter types.
IndexTables solves this with the companion object pattern — a ThreadLocal + WeakHashMap (a hash map whose keys are held by weak references — entries are automatically removed when the key object is garbage-collected, preventing memory leaks from stale query plans) on the ScanBuilder’s companion object:
object IndexTables4SparkScanBuilder {
// ThreadLocal: tracks which DataSourceV2Relation is being planned
private val currentRelation = new ThreadLocal[Option[AnyRef]]
// WeakHashMap: stores custom filter data keyed by the relation object
private val customFilters = new WeakHashMap[AnyRef, MixedBooleanFilter]
def setCurrentRelation(rel: AnyRef): Unit = currentRelation.set(Some(rel))
def storeFilters(rel: AnyRef, filter: MixedBooleanFilter): Unit =
customFilters.put(rel, filter)
def getFilters(rel: AnyRef): Option[MixedBooleanFilter] =
Option(customFilters.get(rel))
}The flow connects the four IndexQuery pushdown stages to the scan builder:
-
Catalyst rule writes.
V2IndexQueryExpressionRule(Stage 3 above) extracts theMixedBooleanFiltertree and stores it in the companion object’s WeakHashMap, keyed by theDataSourceV2Relationnode. -
ScanBuilder reads. When Spark calls
pushFilters()on the ScanBuilder (Stage 4), it retrieves theMixedBooleanFilterusing the current relation as the key and merges it with the standard V2 filters. -
ThreadLocal provides isolation. Catalyst analysis, optimization, and scan planning all happen on the same driver thread for a given query. The ThreadLocal ensures that concurrent queries on different threads don’t see each other’s filter data. This is isolation, not synchronization — there is no contention because each query’s planning runs single-threaded on the driver.
The dual-injection pattern
IndexTables injects
V2IndexQueryExpressionRuleas both a resolution rule and an optimizer rule. Why both?Spark’s analyzer applies rules repeatedly until the plan stops changing — this is called fixed-point resolution. During this process, the
DataSourceV2Relationnode that the resolution rule stored filter data against may be replaced by analysis (Spark may re-resolve it, enrich it with metadata, or reconstruct it). If the key object in the WeakHashMap no longer matches what the ScanBuilder eventually sees, the lookup fails.The optimizer rule is the safety net: it runs after analysis is complete and the plan is stable. It re-stores the filter data keyed by the final
DataSourceV2Relation— the one that will actually be passed tonewScanBuilder(). This guarantees the ScanBuilder finds the data regardless of what analysis did to the plan in between.The exact reason the original key stops working (identity mismatch? structural inequality from added metadata? GC collecting the unreferenced old object?) depends on Spark’s internal analyzer implementation and may vary across versions. The dual-injection pattern is defensive — it doesn’t rely on understanding exactly when or why the key breaks.
Why WeakHashMap (not HashMap)
The keys are DataSourceV2Relation plan nodes — objects created by Spark’s query planner that live only for the duration of query planning. Once the query finishes and the plan is discarded, the DataSourceV2Relation node has no more strong references. The WeakHashMap entry is automatically cleaned up at the next GC cycle (via the ReferenceQueue mechanism), preventing filter data from accumulating across queries.
A regular HashMap would require explicit cleanup (remove the entry after the scan completes), which is fragile — if the query fails or is cancelled, the cleanup code might not run. With WeakHashMap, the GC handles cleanup for free.
What state flows where
| State | Mechanism | Example |
|---|---|---|
| Extension config (key/value) | SparkSession.conf | spark.indextables.read.mode = fast |
| Cloud credentials | Hadoop configuration via SparkSession | AWS region, S3 endpoints |
| Custom filter data (query-time) | Companion object ThreadLocal + WeakHashMap | MixedBooleanFilter carrying IndexQuery expressions |
| Table identity and schema | Table object returned by TableProvider or Catalog | Both paths return the same IndexTables4SparkTable class |
| Source table metadata (catalog path only) | CatalogManager → source catalog’s TBLPROPERTIES | Companion index storage path |
Arrow FFI bridge
Data crosses the JVM ↔ Rust boundary via the Arrow C data interface — the same zero-copy mechanism used by DuckDB, Polars, and DataFusion for cross-language interop.
Read side (ArrowFfiBridge): The Rust search engine produces Arrow record batches and exports them through ArrowSchema and ArrowArray C structs. The Java side imports these structs and wraps the Arrow memory as a Spark ColumnarBatch. No deserialization occurs — Spark reads directly from the memory allocated by Rust.
Write side (ArrowFfiWriteBridge): The Java writer builds Arrow record batches from Spark’s InternalRow objects, exports them as C structs, and the Rust side reads them directly. The Tantivy indexer consumes Arrow batches natively, so the entire write path from Spark row to indexed document involves zero serialization.
This is the same interface described in Arrow C data interface, using ArrowSchema (format string + child schemas) and ArrowArray (buffer pointers + lengths + null counts).
Merge and purge lifecycle
MERGE SPLITS
Analogous to Delta Lake’s OPTIMIZE or Iceberg’s rewrite-data-files. Over time, many small splits accumulate (one per write batch per executor). MERGE SPLITS compacts them:
- The driver reads the transaction log and groups live splits by partition
- Within each partition, splits below the target size (default 5 GB) are grouped for merging
- Each executor calls
QuickwitSplit.mergeSplits()— a Rust operation that merges multiple Tantivy indexes into one - The driver commits atomically:
RemoveActionfor old splits +AddActionfor merged splits
An async merge-on-write mode (AsyncMergeOnWriteManager) runs merges in the background after writes, keeping split count bounded without manual intervention.
PURGE INDEXTABLE
RemoveAction is a tombstone — the physical .split file remains on storage until purged. PURGE deletes split files whose tombstones are older than a configurable retention period. This is the same pattern as Delta Lake’s VACUUM.
PREWARM CACHE
For latency-sensitive workloads, PREWARM downloads split files (or just their footers) to the local NVMe cache before queries arrive. The GlobalSplitCacheManager manages the L1 (memory) and L2 (disk) caches, with LRU eviction and configurable size limits.
See also
- IndexTables for Spark — overview, concepts, configuration, SQL extensions
- Apache Spark Query Planning and Execution — the query lifecycle that IndexTables hooks into
- Spark SQL Extension Framework — how
SparkSessionExtensionsenables custom parsers and rules - Arrow C data interface — the FFI mechanism powering zero-copy data exchange
- Introduction to Apache Iceberg — comparable table format with different storage trade-offs