Tremor is an event processing runtime written in Rust for high-throughput data pipelines. Originally built at Wayfair for observability workloads (logs, metrics, traces).
Current version: 0.12.0 (as of 2024)
Not distributed: Single-node runtime. Scale horizontally by running multiple instances with partitioned input.
No company: Open source project, no managed service. Deploy yourself (EC2, K8s, bare metal).
Two-Language System
1. tremor-script - Data transformation language
- Compiles to custom bytecode
- Runs in embedded VM written in Rust
- JSON-like syntax with pattern matching
- Used for: filtering, transforming, enriching events
2. tremor-query (Troy) - Pipeline topology language
- Compiles to Rust operator graph (no VM)
- Declarative flow definition
- Used for: connecting components, defining routing
Relationship: Troy defines what connects to what, tremor-script defines what to do with data.
The tremor-script VM Architecture
Why a VM?
Tremor needs to execute user-defined transformation logic with these requirements:
- Hot reload - Update scripts without restarting runtime
- Sandboxing - Limit resource usage (CPU, memory, recursion)
- Safety - User scripts can’t crash the runtime
- Dynamic typing - JSON data has no static schema
- Performance - Fast enough for high-throughput pipelines
Why not alternatives:
- Direct Rust compilation: Can’t hot reload, unsafe
- Rhai/Lua: Too general-purpose, not optimized for JSON
- WASM: Serialization overhead, less mature in 2018
- Custom VM: Optimized for exact use case ✓
VM is Not a Separate Process
The VM is a Rust module within the same process. No separate process, no IPC.
┌─────────────────────────────────────────┐
│ Single Rust Process │
│ │
│ ┌─────────────────────────────────┐ │
│ │ Tremor Runtime (Rust) │ │
│ │ │ │
│ │ ┌─────────────────────────┐ │ │
│ │ │ tremor-script VM │ │ │
│ │ │ (Rust module) │ │ │
│ │ │ │ │ │
│ │ │ - Bytecode interpreter │ │ │
│ │ │ - Stack machine │ │ │
│ │ │ - Operates on Value │ │ │
│ │ └─────────────────────────┘ │ │
│ └─────────────────────────────────┘ │
└─────────────────────────────────────────┘
Zero data copying - VM operates directly on Value structs allocated in the same heap.
Execution Pipeline
tremor-script source
↓
Lexer (Rust)
↓
Parser (Rust) → AST
↓
Compiler (Rust) → Bytecode + Constant Pool
↓
VM Executor (Rust) → Result
Bytecode Format
Custom bytecode with opcodes specialized for JSON operations.
pub enum Instr {
// Stack operations
LoadConst(usize), // Push constant from pool
LoadLocal(usize), // Push local variable
StoreLocal(usize), // Pop and store to local
// Arithmetic
Add, Sub, Mul, Div,
// Comparison
Eq, Lt, Gt,
// JSON-specific operations
GetPath(Vec<String>), // event.http.status → optimized path lookup
SetPath(Vec<String>), // Modify nested field
HasKey(String), // Check field existence
// Object construction
MakeObject(usize), // Pop N key-value pairs, create object
MakeArray(usize), // Pop N values, create array
// Pattern matching
MatchPattern(Pattern), // Match against pattern, branch
// Control flow
Jump(usize), // Unconditional jump
JumpIfFalse(usize), // Conditional jump
Call(FnId), // Call function
Return,
// Event routing
Emit(String), // Emit event to port
Drop, // Drop event
}Example compilation:
let x = event.http.status + 100;Compiles to:
LoadLocal(0) // event
GetPath(["http", "status"])
LoadConst(0) // 100 from constant pool
Add
StoreLocal(1) // x
The VM Executor
Stack-based interpreter:
pub struct VM {
stack: Vec<Value>, // Operand stack
locals: Vec<Value>, // Local variables
constants: Vec<Value>, // Constant pool
instructions: Vec<Instr>, // Bytecode
pc: usize, // Program counter
// Sandboxing limits
max_stack_depth: usize,
max_recursion: usize,
ops_executed: usize,
max_ops: usize,
}
impl VM {
pub fn execute(&mut self, event: Value) -> Result<Vec<(String, Value)>> {
self.locals[0] = event; // 'event' is local 0
self.pc = 0;
loop {
let instr = &self.instructions[self.pc];
self.pc += 1;
self.ops_executed += 1;
if self.ops_executed > self.max_ops {
return Err(Error::TooManyOps);
}
match instr {
Instr::LoadConst(idx) => {
let val = self.constants[*idx].clone();
self.stack.push(val);
}
Instr::Add => {
let b = self.stack.pop().unwrap();
let a = self.stack.pop().unwrap();
let result = self.add_values(a, b)?;
self.stack.push(result);
}
Instr::GetPath(path) => {
let obj = self.stack.pop().unwrap();
let val = self.get_path(&obj, path)?;
self.stack.push(val);
}
Instr::Return => break,
// ... other instructions
}
}
Ok(self.emitted_events.drain(..).collect())
}
}Zero-Copy Semantics
The VM operates on Value directly with reference counting:
pub enum Value {
Static(StaticNode), // Inline: Null, Bool, I64, U64, F64
String(Arc<str>), // Reference-counted
Array(Arc<Vec<Value>>), // Reference-counted
Object(Arc<HashMap<String, Value>>), // Reference-counted
}Key optimizations:
- Immutable values are Arc’d - Cloning = incrementing ref count
- Copy-on-write - Mutations create new Arc only if ref count > 1
- Path access - Direct pointer navigation, no serialization
// Accessing event.http.status - no copying
fn get_path(&self, value: &Value, path: &[String]) -> Result<Value> {
let mut current = value;
for segment in path {
match current {
Value::Object(map) => {
current = map.get(segment).ok_or(Error::KeyNotFound)?;
}
_ => return Err(Error::NotAnObject),
}
}
Ok(current.clone()) // Just clones Arc, not data
}When data IS copied (COW):
let event = patch event of
insert new_field = "value"
end;fn patch(&self, modifications: Vec<Modification>) -> Value {
if Arc::strong_count(&self.inner) == 1 {
// Only reference, modify in-place
Arc::make_mut(&mut self.inner).apply(modifications);
self.clone()
} else {
// Multiple references, must clone then modify
let mut new = (*self.inner).clone();
new.apply(modifications);
Value::Object(Arc::new(new))
}
}Pattern Matching Compilation
Pattern matching is a first-class VM operation:
match event of
case %{ http.status == 200 } => "success"
case %{ http.status >= 400 } => "error"
default => "unknown"
endCompiles to specialized bytecode with jump table:
MatchPattern(pattern_0) // %{ http.status == 200 }
JumpIfFalse(label_1)
LoadConst("success")
Jump(label_end)
label_1:
MatchPattern(pattern_1) // %{ http.status >= 400 }
JumpIfFalse(label_default)
LoadConst("error")
Jump(label_end)
label_default:
LoadConst("unknown")
label_end:
Pattern types:
pub enum Pattern {
Present(Vec<String>), // %{ present http.status }
ValueEq(Vec<String>, Value), // %{ http.status == 200 }
ValueGt(Vec<String>, Value), // %{ http.status >= 400 }
RecordPattern(Vec<(String, Pattern)>), // Nested
Bind(String, Box<Pattern>), // Extract and bind
}Custom Operators
The VM has operators optimized for JSON processing:
impl VM {
// Specialized JSON merge (not standard +)
fn merge_objects(&self, a: Value, b: Value) -> Result<Value>;
// Array operations with COW
fn array_push(&self, arr: Value, item: Value) -> Result<Value>;
// Deep path operations
fn set_deep_path(&self, obj: Value, path: &[String], val: Value) -> Result<Value>;
}Function Calls
User-defined and built-in functions:
fn my_transform(value) {
value * 2 + 10
}
my_transform(event.count)Compiled to call frames:
pub struct CompiledFunction {
id: FnId,
params: Vec<String>,
instructions: Vec<Instr>,
local_count: usize,
}
impl VM {
fn call_function(&mut self, fn_id: FnId, args: Vec<Value>) -> Result<Value> {
// Save current frame
let saved_locals = std::mem::replace(&mut self.locals, vec![...]);
let saved_pc = self.pc;
// Set up new frame with args
for (i, arg) in args.into_iter().enumerate() {
self.locals[i] = arg;
}
// Execute function bytecode
self.pc = func.entry_point;
let result = self.execute_until_return()?;
// Restore frame
self.locals = saved_locals;
self.pc = saved_pc;
Ok(result)
}
}Built-in Functions
Implemented in Rust, exposed to VM:
pub type BuiltinFn = fn(&mut VM, Vec<Value>) -> Result<Value>;
// Hundreds of built-ins
vm.register_builtin("string::uppercase", string_uppercase);
vm.register_builtin("array::len", array_len);
vm.register_builtin("json::decode", json_decode);
fn string_uppercase(_vm: &mut VM, args: Vec<Value>) -> Result<Value> {
match &args[0] {
Value::String(s) => Ok(Value::from(s.to_uppercase())),
_ => Err(Error::TypeMismatch)
}
}Sandboxing Implementation
The VM enforces resource limits:
pub struct VMConfig {
pub max_stack_depth: usize, // Default: 1024
pub max_recursion_depth: usize, // Default: 256
pub max_operations: usize, // Default: 1_000_000
pub max_string_length: usize, // Default: 1MB
pub max_array_length: usize, // Default: 10_000
}
impl VM {
fn check_limits(&self) -> Result<()> {
if self.stack.len() > self.config.max_stack_depth {
return Err(Error::StackOverflow);
}
if self.ops_executed > self.config.max_operations {
return Err(Error::TooManyOperations);
}
if self.call_depth > self.config.max_recursion_depth {
return Err(Error::RecursionLimit);
}
Ok(())
}
}Prevents infinite loops, stack overflow, resource exhaustion.
Performance Trade-offs
Why VM is fast enough:
- No serialization - Operates directly on native
Valuestructures - Reference counting - Cheap clones via Arc
- Specialized opcodes -
GetPathis single instruction - Compiled once - Scripts compile to bytecode, reused across events
- Written in Rust - No FFI overhead, same optimization level
Why still slower than native Rust:
- Interpretation overhead - Bytecode dispatch (~10-20ns per instruction)
- Dynamic dispatch - Type checks at runtime for every operation
- Stack operations - Push/pop overhead
- Bounds checking - Every instruction validated
Performance comparison:
- Native Rust: 1x (baseline)
- tremor-script VM: 10-50x slower
- Python/JavaScript: 100-1000x slower
Still fast enough - Can process 500K-1M events/sec with complex transformations.
Comparison to Other VMs
| VM | Type | Use Case | Key Feature |
|---|---|---|---|
| Tremor VM | Stack, bytecode | JSON transformation | Zero-copy JSON operations |
| Lua VM | Register, bytecode | General purpose | Small, fast |
| JVM | Stack, bytecode + JIT | General purpose | Mature, optimized |
| WASM | Stack, bytecode | Sandboxed execution | Portable |
| V8 | JIT to native | JavaScript | Very fast |
Tremor VM is architecturally similar to Lua VM but specialized for JSON.
Using tremor-script and Troy Together
Troy Defines Topology
Troy (.troy files) is the wiring diagram:
define flow my_app
flow
// 1. Define INPUT
define connector http_in from http_server
with codec = "json", config = { "port": 8080 }
end;
// 2. Define PIPELINE
define pipeline process
pipeline
// Inline tremor-script in select
select {
"timestamp": system::ingest_ns(),
"level": match event.severity of
case x when x > 3 => "error"
default => "info"
end
}
from in into out;
end;
// 3. Define OUTPUT
define connector kafka_out from kafka
with codec = "json", config = { "topic": "logs" }
end;
// 4. CONNECT THEM
connect /connector/http_in to /pipeline/process;
connect /pipeline/process to /connector/kafka_out;
end;
deploy flow my_app;tremor-script Defines Transformations
External script file (transform.tremor):
let enriched = patch event of
insert environment = "production",
insert processed_at = system::ingest_ns()
end;
match enriched of
case %{ level == "error" } => emit enriched => "errors"
case %{ level == "warning" } => emit enriched => "warnings"
default => emit enriched => "info"
endLoad in troy:
define pipeline process
pipeline
define script transform from "/path/to/transform.tremor";
select event from in into transform;
select event from transform/errors into error_output;
select event from transform/warnings into warning_output;
select event from transform/info into info_output;
end;Key relationship:
- Troy = deployment config - “What connects to what”
- tremor-script = data transformation - “What to do with data”
- Scripts execute inside pipelines defined in troy
- Can inline simple scripts or load complex scripts from files
Rust Integration
Embedding Runtime
use tremor_runtime::{Runtime, RuntimeConfig};
#[tokio::main]
async fn main() -> Result<()> {
let runtime = Runtime::new(RuntimeConfig::default()).await?;
runtime.load_pipeline("config.troy").await?;
runtime.start().await?;
tokio::signal::ctrl_c().await?;
runtime.stop().await?;
Ok(())
}Executing Scripts Directly
use tremor_script::{Script, Value, EventContext};
let script = Script::parse(r#"
let event = patch event of insert processed = true end;
event
"#)?;
let event = Value::from(json!({"message": "hello"}));
let ctx = EventContext::new(0, None);
let result = script.run(&ctx, event)?;Custom Connector
use tremor_runtime::connector::{Connector, ConnectorContext};
use async_trait::async_trait;
#[async_trait]
pub trait Connector: Send {
async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()>;
async fn connect(&mut self) -> Result<()>;
async fn disconnect(&mut self) -> Result<()>;
}Source connector example (receives external data, sends to pipeline):
pub struct HttpSourceConnector {
server: HttpServer,
codec: Box<dyn Codec>,
}
#[async_trait]
impl Connector for HttpSourceConnector {
async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()> {
// Event contains raw HTTP request
let decoded = self.codec.decode(&event.data)?;
let processed = Event {
data: decoded,
ingest_ns: system_time(),
..event
};
// Forward into Tremor pipeline
ctx.send(processed).await?;
Ok(())
}
}Sink connector example (receives from pipeline, writes externally):
pub struct KafkaSinkConnector {
producer: KafkaProducer,
codec: Box<dyn Codec>,
}
#[async_trait]
impl Connector for KafkaSinkConnector {
async fn on_event(&mut self, event: Event, ctx: &ConnectorContext) -> Result<()> {
// Event arrived from pipeline
let bytes = self.codec.encode(&event.data)?;
// Write to Kafka (don't forward - this is a sink)
self.producer.send("topic", bytes).await?;
Ok(())
}
}Custom Operator
use tremor_pipeline::{Operator, Event};
pub trait Operator: Send {
fn on_event(&mut self, port: &str, event: Event)
-> Result<Vec<(String, Event)>>;
}Custom Codec
use tremor_codec::Codec;
pub trait Codec: Send {
fn decode(&mut self, data: &[u8]) -> Result<Value>;
fn encode(&mut self, value: &Value) -> Result<Vec<u8>>;
}Extending VM with Custom Functions
use tremor_script::registry::Registry;
pub fn my_custom_function(args: &[Value]) -> Result<Value> {
// Your logic
Ok(Value::from("result"))
}
let mut registry = Registry::default();
registry.insert("my_function", my_custom_function);
let script = Script::parse_with_registry(source, ®istry)?;Architecture
┌─────────────────────────────────────────┐
│ Tremor Runtime (Rust) │
│ │
│ Onramp → Pipeline → Script VM → Offramp│
│ (IO) (graph) (bytecode) (IO) │
│ │
│ Thread Pool (Tokio) │
│ State Backend (RocksDB) │
└─────────────────────────────────────────┘
Event flow:
- Onramp (connector): Decode bytes →
Event - Pipeline: Route through operators (compiled Rust)
- Script: Transform
Event.data(VM executes bytecode) - Offramp (connector): Encode
Event→ bytes
Concurrency:
- Onramps/Offramps: Dedicated threads (blocking IO)
- Pipelines: Async tasks (Tokio runtime)
- Events:
Arc<Event>for zero-copy routing
Core types:
pub struct Event {
id: EventId,
data: Value, // Payload (scripts operate on this)
ingest_ns: u64,
metadata: Value,
transactional: bool,
}
pub enum Value {
Static(StaticNode), // Null, Bool, I64, U64, F64
String(Arc<str>),
Array(Arc<Vec<Value>>),
Object(Arc<HashMap<String, Value>>),
}Performance
Throughput (single instance, 8 cores)
| Workload | Events/sec | Bottleneck |
|---|---|---|
| Simple routing | 5-10M | I/O |
| JSON decode/encode | 1-2M | Codec |
| Script transformation | 500K-1M | VM |
| Windowed aggregation | 100K-500K | State |
Latency
- p50: <100μs
- p99: <500μs
- p999: 1-5ms
Memory
- Baseline: ~100MB
- Per event: ~1-2KB (varies by size)
- Events are
Arc<Event>, cheap to route
State Management
Windows (in-memory)
define window w from tumbling with size = 10s end;
select {
"count": aggr::count(),
"sum": aggr::sum(event.value)
}
from in[w]
group by event.host
into out;KV Store (RocksDB-backed)
use std::state;
let count = state::get("counter") ?? 0;
state::put("counter", count + 1);Local only - not distributed across instances.
Available Components
Connectors: kafka, http, tcp, udp, file, websocket, unix-socket, elasticsearch, influxdb, postgres, clickhouse, stdin, stdout
Codecs: json, msgpack, bincode, yaml, influx (line protocol), statsd, syslog, binary, string
All extensible via Rust traits.
vs Kafka Streams vs Flink
| Feature | Tremor | Kafka Streams | Flink |
|---|---|---|---|
| Architecture | Standalone | Embedded library | Distributed |
| Language | Rust | Java | Java/Scala/Python |
| Deployment | Single binary | In your app | Cluster |
| State | Local only | Kafka-backed | Distributed |
| Scaling | Manual | Auto | Auto |
| Latency | Sub-ms | Low ms | Low ms |
| Memory | ~100MB | Medium | High (JVM) |
| Complexity | Low | Medium | High |
When to use Tremor:
- Observability pipelines (logs, metrics, traces)
- High-throughput routing/filtering
- Single-node or manually partitioned workloads
- Low-latency requirements (<1ms)
- Operational simplicity
- Rust ecosystem
When NOT to use:
- Distributed stateful processing
- Multi-stream joins with consistency
- Exactly-once semantics required
- Need managed service (none exists)
Deployment
# Binary
tremor server run config.troy
# Docker
docker run tremorproject/tremor:0.12 server run /config/main.troy
# Kubernetes
kubectl apply -f tremor-deployment.yamlScaling strategy: Run multiple instances, partition at source (Kafka consumer groups, HTTP load balancer). No coordination between instances.
Monitoring
Exposes Prometheus metrics on /metrics:
tremor_events_total- Events processedtremor_latency_ns- Processing latency histogramtremor_queue_depth- Channel queue depthstremor_errors_total- Error counts
REST API endpoint: GET /v1/status for runtime health.
Limitations
- Not distributed - No cluster coordination, no distributed state
- Manual scaling - No automatic rebalancing
- Basic windowing - No session windows, no event-time watermarks
- No exactly-once - At-most-once or at-least-once only
- No managed service - Self-hosted only
Dependencies
[dependencies]
tremor-runtime = "0.12"
tremor-script = "0.12"
tremor-pipeline = "0.12"
tremor-value = "0.12"
tremor-codec = "0.12"Summary
Tremor uses a custom bytecode VM written in Rust for executing tremor-script. The VM is stack-based with opcodes optimized for JSON operations. Key insight: VM is embedded in same process, operates directly on native Value structs with zero-copy semantics via Arc. No serialization boundary between “Rust” and “VM” - the VM IS Rust code. Hot reload and sandboxing without sacrificing too much performance (~10-50x slower than native Rust, but still processes 500K-1M events/sec). Troy defines topology (what connects to what), tremor-script defines transformations (what to do with data). Good for observability pipelines where operational simplicity and low latency matter more than distributed features.