Handling Late-Arriving Data

Late-arriving data is common in streaming systems due to network latency, batching delays, or asynchronous processing. When an event arrives after the expected processing window, the system must decide how to handle it. The choice between dropping, reprocessing, or storing separately depends on the business requirements for data completeness and processing latency.

Watermarking and Windowing

Watermarks define the maximum allowed delay for an event to still be considered on-time. When an event’s timestamp is earlier than the watermark, it is classified as late. Handling strategies generally include:

  • Dropping late events, prioritizing low-latency processing over completeness.
  • Reprocessing late events by reopening windows and recomputing aggregates.
  • Storing late events separately for asynchronous reconciliation.

In Apache Flink, allowed lateness and side output streams enable advanced late data handling.

// Define the OutputTag for late events
final OutputTag<Event> lateEventsTag = new OutputTag<Event>("late-events"){};
 
DataStream<Event> mainStream = env.addSource(...);
 
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());
 
SingleOutputStreamOperator<Event> processedStream = mainStream
    .assignTimestampsAndWatermarks(strategy)
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.seconds(30))
    .sideOutputLateData(lateEventsTag)  // Specify the side output for late events
    .process(new EventCountWindowFunction());
 
// Retrieve the side output stream of late events
DataStream<Event> lateEvents = processedStream.getSideOutput(lateEventsTag);

In this example:

  • allowedLateness allows late data to be reprocessed by reopening the window.
  • sideOutputLateData enables separate processing of data that exceeds the allowed lateness.

Spark Structured Streaming Example

Spark does not support side outputs, so handling late events requires explicit filtering and storage logic. Typically, late events can be written to a separate sink (e.g., another Kafka topic or a dedicated Delta Lake table).

import org.apache.spark.sql.functions._
 
val events = spark.readStream
  .format("kafka")
  .option("subscribe", "events")
  .load()
 
val parsedEvents = events
  .withColumn("event_time", $"timestamp")
  .withWatermark("event_time", "10 minutes")
 
val onTimeEvents = parsedEvents.filter($"event_time" > current_timestamp() - expr("INTERVAL 10 MINUTES"))
val lateEvents = parsedEvents.filter($"event_time" <= current_timestamp() - expr("INTERVAL 10 MINUTES"))
 
onTimeEvents.writeStream.format("console").start()
lateEvents.writeStream.format("parquet").option("path", "/late-events").start()

In Spark, late data is manually filtered and routed to different outputs based on custom logic, allowing more flexible handling but requiring additional management.

Handling Duplicate Data

Duplicate events occur due to at-least-once delivery semantics, retries, or network failures. De-duplication strategies include primary key de-duplication, window-based de-duplication, and idempotent mutation.

Primary Key De-Duplication

When events have a natural key (e.g., event_id), upserts or MERGE operations can overwrite duplicates. This pattern is widely used in Delta Lake, Hudi, and Iceberg.

MERGE INTO target_table AS t
USING source_table AS s
ON t.event_id = s.event_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;

Idempotent Mutation with PostgreSQL

PostgreSQL’s ON CONFLICT clause provides a powerful pattern for idempotent mutations, where repeated insertions of the same event do not alter the state if the event has already been processed.

INSERT INTO user_sessions (user_id, session_id, last_seen)
VALUES (123, 'session_abc', '2023-02-22')
ON CONFLICT (user_id, session_id)
DO UPDATE SET last_seen = EXCLUDED.last_seen;
  • EXCLUDED.last_seen is an internal reference to the attempted insertion values. In the case of a conflict, the update operation is triggered, allowing safe deduplication.
  • This approach ensures that reapplying the same event does not mutate the state, adhering to idempotent processing principles.

Window-Based De-Duplication

In streaming contexts, sliding windows of recently seen event IDs help filter duplicates.

SELECT
    event_id,
    COUNT(*) AS occurrence
FROM events
GROUP BY
    HOP(event_time, INTERVAL '1' MINUTE, INTERVAL '10' MINUTE)
HAVING occurrence = 1;

The HOP function creates sliding windows that overlap, allowing each event to contribute to multiple windows. This approach is particularly effective for event streams where duplicates may arrive at different times.

Best Practices

  • Implement watermarks and allowed lateness to handle late-arriving data effectively.
  • Use side outputs (Flink) or manual filtering (Spark) to manage late data.
  • Prefer idempotent operations and primary key-based deduplication to ensure data integrity.
  • Test strategies for late-arriving and duplicate data in a staging environment before applying them in production.