Tremor is an event processing runtime written in Rust for high-throughput data pipelines. Originally built at Wayfair for observability workloads (logs, metrics, traces).

Current version: 0.12.0 (as of 2024)

Not distributed: Single-node runtime. Scale horizontally by running multiple instances with partitioned input.

No company: Open source project, no managed service. Deploy yourself (EC2, K8s, bare metal).

Two-Language System

1. tremor-script - Data transformation language

  • Compiles to custom bytecode
  • Runs in embedded VM written in Rust
  • JSON-like syntax with pattern matching
  • Used for: filtering, transforming, enriching events

2. tremor-query (Troy) - Pipeline topology language

  • Compiles to Rust operator graph (no VM)
  • Declarative flow definition
  • Used for: connecting components, defining routing

Relationship: Troy defines what connects to what, tremor-script defines what to do with data.

The tremor-script VM Architecture

Why a VM?

Tremor needs to execute user-defined transformation logic with these requirements:

  1. Hot reload - Update scripts without restarting runtime
  2. Sandboxing - Limit resource usage (CPU, memory, recursion)
  3. Safety - User scripts can’t crash the runtime
  4. Dynamic typing - JSON data has no static schema
  5. Performance - Fast enough for high-throughput pipelines

Why not alternatives:

  • Direct Rust compilation: Can’t hot reload, unsafe
  • Rhai/Lua: Too general-purpose, not optimized for JSON
  • WASM: Serialization overhead, less mature in 2018
  • Custom VM: Optimized for exact use case ✓

VM is Not a Separate Process

The VM is a Rust module within the same process. No separate process, no IPC.

┌─────────────────────────────────────────┐
│         Single Rust Process             │
│                                         │
│  ┌─────────────────────────────────┐   │
│  │  Tremor Runtime (Rust)          │   │
│  │                                 │   │
│  │  ┌─────────────────────────┐   │   │
│  │  │  tremor-script VM       │   │   │
│  │  │  (Rust module)          │   │   │
│  │  │                         │   │   │
│  │  │  - Bytecode interpreter │   │   │
│  │  │  - Stack machine        │   │   │
│  │  │  - Operates on Value    │   │   │
│  │  └─────────────────────────┘   │   │
│  └─────────────────────────────────┘   │
└─────────────────────────────────────────┘

Zero data copying - VM operates directly on Value structs allocated in the same heap.

Execution Pipeline

tremor-script source
       ↓
    Lexer (Rust)
       ↓
    Parser (Rust) → AST
       ↓
    Compiler (Rust) → Bytecode + Constant Pool
       ↓
    VM Executor (Rust) → Result

Bytecode Format

Custom bytecode with opcodes specialized for JSON operations.

pub enum Instr {
    // Stack operations
    LoadConst(usize),      // Push constant from pool
    LoadLocal(usize),      // Push local variable
    StoreLocal(usize),     // Pop and store to local
 
    // Arithmetic
    Add, Sub, Mul, Div,
 
    // Comparison
    Eq, Lt, Gt,
 
    // JSON-specific operations
    GetPath(Vec<String>),  // event.http.status → optimized path lookup
    SetPath(Vec<String>),  // Modify nested field
    HasKey(String),        // Check field existence
 
    // Object construction
    MakeObject(usize),     // Pop N key-value pairs, create object
    MakeArray(usize),      // Pop N values, create array
 
    // Pattern matching
    MatchPattern(Pattern), // Match against pattern, branch
 
    // Control flow
    Jump(usize),           // Unconditional jump
    JumpIfFalse(usize),    // Conditional jump
    Call(FnId),            // Call function
    Return,
 
    // Event routing
    Emit(String),          // Emit event to port
    Drop,                  // Drop event
}

Example compilation:

let x = event.http.status + 100;

Compiles to:

LoadLocal(0)              // event
GetPath(["http", "status"])
LoadConst(0)              // 100 from constant pool
Add
StoreLocal(1)             // x

The VM Executor

Stack-based interpreter:

pub struct VM {
    stack: Vec<Value>,           // Operand stack
    locals: Vec<Value>,          // Local variables
    constants: Vec<Value>,       // Constant pool
    instructions: Vec<Instr>,    // Bytecode
    pc: usize,                   // Program counter
 
    // Sandboxing limits
    max_stack_depth: usize,
    max_recursion: usize,
    ops_executed: usize,
    max_ops: usize,
}
 
impl VM {
    pub fn execute(&mut self, event: Value) -> Result<Vec<(String, Value)>> {
        self.locals[0] = event;  // 'event' is local 0
        self.pc = 0;
 
        loop {
            let instr = &self.instructions[self.pc];
            self.pc += 1;
            self.ops_executed += 1;
 
            if self.ops_executed > self.max_ops {
                return Err(Error::TooManyOps);
            }
 
            match instr {
                Instr::LoadConst(idx) => {
                    let val = self.constants[*idx].clone();
                    self.stack.push(val);
                }
 
                Instr::Add => {
                    let b = self.stack.pop().unwrap();
                    let a = self.stack.pop().unwrap();
                    let result = self.add_values(a, b)?;
                    self.stack.push(result);
                }
 
                Instr::GetPath(path) => {
                    let obj = self.stack.pop().unwrap();
                    let val = self.get_path(&obj, path)?;
                    self.stack.push(val);
                }
 
                Instr::Return => break,
                // ... other instructions
            }
        }
 
        Ok(self.emitted_events.drain(..).collect())
    }
}

Zero-Copy Semantics

The VM operates on Value directly with reference counting:

pub enum Value {
    Static(StaticNode),       // Inline: Null, Bool, I64, U64, F64
    String(Arc<str>),         // Reference-counted
    Array(Arc<Vec<Value>>),   // Reference-counted
    Object(Arc<HashMap<String, Value>>),  // Reference-counted
}

Key optimizations:

  1. Immutable values are Arc’d - Cloning = incrementing ref count
  2. Copy-on-write - Mutations create new Arc only if ref count > 1
  3. Path access - Direct pointer navigation, no serialization
// Accessing event.http.status - no copying
fn get_path(&self, value: &Value, path: &[String]) -> Result<Value> {
    let mut current = value;
    for segment in path {
        match current {
            Value::Object(map) => {
                current = map.get(segment).ok_or(Error::KeyNotFound)?;
            }
            _ => return Err(Error::NotAnObject),
        }
    }
    Ok(current.clone())  // Just clones Arc, not data
}

When data IS copied (COW):

let event = patch event of
    insert new_field = "value"
end;
fn patch(&self, modifications: Vec<Modification>) -> Value {
    if Arc::strong_count(&self.inner) == 1 {
        // Only reference, modify in-place
        Arc::make_mut(&mut self.inner).apply(modifications);
        self.clone()
    } else {
        // Multiple references, must clone then modify
        let mut new = (*self.inner).clone();
        new.apply(modifications);
        Value::Object(Arc::new(new))
    }
}

Pattern Matching Compilation

Pattern matching is a first-class VM operation:

match event of
  case %{ http.status == 200 } => "success"
  case %{ http.status >= 400 } => "error"
  default => "unknown"
end

Compiles to specialized bytecode with jump table:

MatchPattern(pattern_0)      // %{ http.status == 200 }
JumpIfFalse(label_1)
LoadConst("success")
Jump(label_end)

label_1:
MatchPattern(pattern_1)      // %{ http.status >= 400 }
JumpIfFalse(label_default)
LoadConst("error")
Jump(label_end)

label_default:
LoadConst("unknown")

label_end:

Pattern types:

pub enum Pattern {
    Present(Vec<String>),          // %{ present http.status }
    ValueEq(Vec<String>, Value),   // %{ http.status == 200 }
    ValueGt(Vec<String>, Value),   // %{ http.status >= 400 }
    RecordPattern(Vec<(String, Pattern)>),  // Nested
    Bind(String, Box<Pattern>),    // Extract and bind
}

Custom Operators

The VM has operators optimized for JSON processing:

impl VM {
    // Specialized JSON merge (not standard +)
    fn merge_objects(&self, a: Value, b: Value) -> Result<Value>;
 
    // Array operations with COW
    fn array_push(&self, arr: Value, item: Value) -> Result<Value>;
 
    // Deep path operations
    fn set_deep_path(&self, obj: Value, path: &[String], val: Value) -> Result<Value>;
}

Function Calls

User-defined and built-in functions:

fn my_transform(value) {
    value * 2 + 10
}
 
my_transform(event.count)

Compiled to call frames:

pub struct CompiledFunction {
    id: FnId,
    params: Vec<String>,
    instructions: Vec<Instr>,
    local_count: usize,
}
 
impl VM {
    fn call_function(&mut self, fn_id: FnId, args: Vec<Value>) -> Result<Value> {
        // Save current frame
        let saved_locals = std::mem::replace(&mut self.locals, vec![...]);
        let saved_pc = self.pc;
 
        // Set up new frame with args
        for (i, arg) in args.into_iter().enumerate() {
            self.locals[i] = arg;
        }
 
        // Execute function bytecode
        self.pc = func.entry_point;
        let result = self.execute_until_return()?;
 
        // Restore frame
        self.locals = saved_locals;
        self.pc = saved_pc;
 
        Ok(result)
    }
}

Built-in Functions

Implemented in Rust, exposed to VM:

pub type BuiltinFn = fn(&mut VM, Vec<Value>) -> Result<Value>;
 
// Hundreds of built-ins
vm.register_builtin("string::uppercase", string_uppercase);
vm.register_builtin("array::len", array_len);
vm.register_builtin("json::decode", json_decode);
 
fn string_uppercase(_vm: &mut VM, args: Vec<Value>) -> Result<Value> {
    match &args[0] {
        Value::String(s) => Ok(Value::from(s.to_uppercase())),
        _ => Err(Error::TypeMismatch)
    }
}

Sandboxing Implementation

The VM enforces resource limits:

pub struct VMConfig {
    pub max_stack_depth: usize,        // Default: 1024
    pub max_recursion_depth: usize,    // Default: 256
    pub max_operations: usize,         // Default: 1_000_000
    pub max_string_length: usize,      // Default: 1MB
    pub max_array_length: usize,       // Default: 10_000
}
 
impl VM {
    fn check_limits(&self) -> Result<()> {
        if self.stack.len() > self.config.max_stack_depth {
            return Err(Error::StackOverflow);
        }
        if self.ops_executed > self.config.max_operations {
            return Err(Error::TooManyOperations);
        }
        if self.call_depth > self.config.max_recursion_depth {
            return Err(Error::RecursionLimit);
        }
        Ok(())
    }
}

Prevents infinite loops, stack overflow, resource exhaustion.

Performance Trade-offs

Why VM is fast enough:

  1. No serialization - Operates directly on native Value structures
  2. Reference counting - Cheap clones via Arc
  3. Specialized opcodes - GetPath is single instruction
  4. Compiled once - Scripts compile to bytecode, reused across events
  5. Written in Rust - No FFI overhead, same optimization level

Why still slower than native Rust:

  1. Interpretation overhead - Bytecode dispatch (~10-20ns per instruction)
  2. Dynamic dispatch - Type checks at runtime for every operation
  3. Stack operations - Push/pop overhead
  4. Bounds checking - Every instruction validated

Performance comparison:

  • Native Rust: 1x (baseline)
  • tremor-script VM: 10-50x slower
  • Python/JavaScript: 100-1000x slower

Still fast enough - Can process 500K-1M events/sec with complex transformations.

Comparison to Other VMs

VMTypeUse CaseKey Feature
Tremor VMStack, bytecodeJSON transformationZero-copy JSON operations
Lua VMRegister, bytecodeGeneral purposeSmall, fast
JVMStack, bytecode + JITGeneral purposeMature, optimized
WASMStack, bytecodeSandboxed executionPortable
V8JIT to nativeJavaScriptVery fast

Tremor VM is architecturally similar to Lua VM but specialized for JSON.

Using tremor-script and Troy Together

Troy Defines Topology

Troy (.troy files) is the wiring diagram:

define flow my_app
flow
  // 1. Define INPUT
  define connector http_in from http_server
  with codec = "json", config = { "port": 8080 }
  end;
 
  // 2. Define PIPELINE
  define pipeline process
  pipeline
    // Inline tremor-script in select
    select {
      "timestamp": system::ingest_ns(),
      "level": match event.severity of
        case x when x > 3 => "error"
        default => "info"
      end
    }
    from in into out;
  end;
 
  // 3. Define OUTPUT
  define connector kafka_out from kafka
  with codec = "json", config = { "topic": "logs" }
  end;
 
  // 4. CONNECT THEM
  connect /connector/http_in to /pipeline/process;
  connect /pipeline/process to /connector/kafka_out;
end;
 
deploy flow my_app;

tremor-script Defines Transformations

External script file (transform.tremor):

let enriched = patch event of
  insert environment = "production",
  insert processed_at = system::ingest_ns()
end;
 
match enriched of
  case %{ level == "error" } => emit enriched => "errors"
  case %{ level == "warning" } => emit enriched => "warnings"
  default => emit enriched => "info"
end

Load in troy:

define pipeline process
pipeline
  define script transform from "/path/to/transform.tremor";
 
  select event from in into transform;
  select event from transform/errors into error_output;
  select event from transform/warnings into warning_output;
  select event from transform/info into info_output;
end;

Key relationship:

  • Troy = deployment config - “What connects to what”
  • tremor-script = data transformation - “What to do with data”
  • Scripts execute inside pipelines defined in troy
  • Can inline simple scripts or load complex scripts from files

Rust Integration

Embedding Runtime

use tremor_runtime::{Runtime, RuntimeConfig};
 
#[tokio::main]
async fn main() -> Result<()> {
    let runtime = Runtime::new(RuntimeConfig::default()).await?;
    runtime.load_pipeline("config.troy").await?;
    runtime.start().await?;
    tokio::signal::ctrl_c().await?;
    runtime.stop().await?;
    Ok(())
}

Executing Scripts Directly

use tremor_script::{Script, Value, EventContext};
 
let script = Script::parse(r#"
    let event = patch event of insert processed = true end;
    event
"#)?;
 
let event = Value::from(json!({"message": "hello"}));
let ctx = EventContext::new(0, None);
let result = script.run(&ctx, event)?;

Custom Connector

use tremor_runtime::connector::{Connector, ConnectorContext};
use async_trait::async_trait;
 
#[async_trait]
pub trait Connector: Send {
    async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()>;
    async fn connect(&mut self) -> Result<()>;
    async fn disconnect(&mut self) -> Result<()>;
}

Source connector example (receives external data, sends to pipeline):

pub struct HttpSourceConnector {
    server: HttpServer,
    codec: Box<dyn Codec>,
}
 
#[async_trait]
impl Connector for HttpSourceConnector {
    async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()> {
        // Event contains raw HTTP request
        let decoded = self.codec.decode(&event.data)?;
 
        let processed = Event {
            data: decoded,
            ingest_ns: system_time(),
            ..event
        };
 
        // Forward into Tremor pipeline
        ctx.send(processed).await?;
 
        Ok(())
    }
}

Sink connector example (receives from pipeline, writes externally):

pub struct KafkaSinkConnector {
    producer: KafkaProducer,
    codec: Box<dyn Codec>,
}
 
#[async_trait]
impl Connector for KafkaSinkConnector {
    async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()> {
        // Event arrived from pipeline
        let bytes = self.codec.encode(&event.data)?;
 
        // Write to Kafka (don't forward - this is a sink)
        self.producer.send("topic", bytes).await?;
 
        Ok(())
    }
}

Custom Operator

use tremor_pipeline::{Operator, Event};
 
pub trait Operator: Send {
    fn on_event(&mut self, port: &str, event: Event)
        -> Result<Vec<(String, Event)>>;
}

Custom Codec

use tremor_codec::Codec;
 
pub trait Codec: Send {
    fn decode(&mut self, data: &[u8]) -> Result<Value>;
    fn encode(&mut self, value: &Value) -> Result<Vec<u8>>;
}

Extending VM with Custom Functions

use tremor_script::registry::Registry;
 
pub fn my_custom_function(args: &[Value]) -> Result<Value> {
    // Your logic
    Ok(Value::from("result"))
}
 
let mut registry = Registry::default();
registry.insert("my_function", my_custom_function);
 
let script = Script::parse_with_registry(source, &registry)?;

Architecture

┌─────────────────────────────────────────┐
│         Tremor Runtime (Rust)           │
│                                         │
│  Onramp → Pipeline → Script VM → Offramp│
│   (IO)    (graph)    (bytecode)   (IO) │
│                                         │
│  Thread Pool (Tokio)                    │
│  State Backend (RocksDB)                │
└─────────────────────────────────────────┘

Event flow:

  1. Onramp (connector): Decode bytes → Event
  2. Pipeline: Route through operators (compiled Rust)
  3. Script: Transform Event.data (VM executes bytecode)
  4. Offramp (connector): Encode Event → bytes

Concurrency:

  • Onramps/Offramps: Dedicated threads (blocking IO)
  • Pipelines: Async tasks (Tokio runtime)
  • Events: Arc<Event> for zero-copy routing

Core types:

pub struct Event {
    id: EventId,
    data: Value,              // Payload (scripts operate on this)
    ingest_ns: u64,
    metadata: Value,
    transactional: bool,
}
 
pub enum Value {
    Static(StaticNode),       // Null, Bool, I64, U64, F64
    String(Arc<str>),
    Array(Arc<Vec<Value>>),
    Object(Arc<HashMap<String, Value>>),
}

Performance

Throughput (single instance, 8 cores)

WorkloadEvents/secBottleneck
Simple routing5-10MI/O
JSON decode/encode1-2MCodec
Script transformation500K-1MVM
Windowed aggregation100K-500KState

Latency

  • p50: <100μs
  • p99: <500μs
  • p999: 1-5ms

Memory

  • Baseline: ~100MB
  • Per event: ~1-2KB (varies by size)
  • Events are Arc<Event>, cheap to route

State Management

Windows (in-memory)

define window w from tumbling with size = 10s end;
 
select {
  "count": aggr::count(),
  "sum": aggr::sum(event.value)
}
from in[w]
group by event.host
into out;

KV Store (RocksDB-backed)

use std::state;
 
let count = state::get("counter") ?? 0;
state::put("counter", count + 1);

Local only - not distributed across instances.

Available Components

Connectors: kafka, http, tcp, udp, file, websocket, unix-socket, elasticsearch, influxdb, postgres, clickhouse, stdin, stdout

Codecs: json, msgpack, bincode, yaml, influx (line protocol), statsd, syslog, binary, string

All extensible via Rust traits.

FeatureTremorKafka StreamsFlink
ArchitectureStandaloneEmbedded libraryDistributed
LanguageRustJavaJava/Scala/Python
DeploymentSingle binaryIn your appCluster
StateLocal onlyKafka-backedDistributed
ScalingManualAutoAuto
LatencySub-msLow msLow ms
Memory~100MBMediumHigh (JVM)
ComplexityLowMediumHigh

When to use Tremor:

  • Observability pipelines (logs, metrics, traces)
  • High-throughput routing/filtering
  • Single-node or manually partitioned workloads
  • Low-latency requirements (<1ms)
  • Operational simplicity
  • Rust ecosystem

When NOT to use:

  • Distributed stateful processing
  • Multi-stream joins with consistency
  • Exactly-once semantics required
  • Need managed service (none exists)

Deployment

# Binary
tremor server run config.troy
 
# Docker
docker run tremorproject/tremor:0.12 server run /config/main.troy
 
# Kubernetes
kubectl apply -f tremor-deployment.yaml

Scaling strategy: Run multiple instances, partition at source (Kafka consumer groups, HTTP load balancer). No coordination between instances.

Monitoring

Exposes Prometheus metrics on /metrics:

  • tremor_events_total - Events processed
  • tremor_latency_ns - Processing latency histogram
  • tremor_queue_depth - Channel queue depths
  • tremor_errors_total - Error counts

REST API endpoint: GET /v1/status for runtime health.

Limitations

  • Not distributed - No cluster coordination, no distributed state
  • Manual scaling - No automatic rebalancing
  • Basic windowing - No session windows, no event-time watermarks
  • No exactly-once - At-most-once or at-least-once only
  • No managed service - Self-hosted only

Dependencies

[dependencies]
tremor-runtime = "0.12"
tremor-script = "0.12"
tremor-pipeline = "0.12"
tremor-value = "0.12"
tremor-codec = "0.12"

Summary

Tremor uses a custom bytecode VM written in Rust for executing tremor-script. The VM is stack-based with opcodes optimized for JSON operations. Key insight: VM is embedded in same process, operates directly on native Value structs with zero-copy semantics via Arc. No serialization boundary between “Rust” and “VM” - the VM IS Rust code. Hot reload and sandboxing without sacrificing too much performance (~10-50x slower than native Rust, but still processes 500K-1M events/sec). Troy defines topology (what connects to what), tremor-script defines transformations (what to do with data). Good for observability pipelines where operational simplicity and low latency matter more than distributed features.