Handling Late-Arriving Data

Output tags in Apache Flink are particularly useful for handling late-arriving data in event-time windowed processing. By defining an OutputTag, you can redirect late events to a side output stream and handle them separately from the main data flow.

Example: Handling Late Events

final OutputTag<Event> lateEventsTag = new OutputTag<Event>("late-events"){};
 
DataStream<Event> mainStream = env.addSource(...);
 
WatermarkStrategy<Event> strategy = WatermarkStrategy
    .<Event>forBoundedOutOfOrderness(Duration.ofSeconds(10))
    .withTimestampAssigner((event, timestamp) -> event.getEventTime());
 
SingleOutputStreamOperator<Event> processedStream = mainStream
    .assignTimestampsAndWatermarks(strategy)
    .keyBy(Event::getUserId)
    .window(TumblingEventTimeWindows.of(Time.minutes(5)))
    .allowedLateness(Time.seconds(30))
    .sideOutputLateData(lateEventsTag)
    .process(new EventCountWindowFunction());
 
DataStream<Event> lateEvents = processedStream.getSideOutput(lateEventsTag);
  • Late events are stored in the lateEvents stream for separate processing, such as writing to a different sink or triggering alerts.

Error Handling and Dead Letter Queue

When processing unstructured data or complex transformations, errors may occur. Using OutputTag, invalid records can be diverted to a dead letter queue (DLQ).

final OutputTag<String> errorTag = new OutputTag<String>("error-events"){};
 
DataStream<String> validEvents = mainStream.process(new ProcessFunction<String, String>() {
    @Override
    public void processElement(String value, Context ctx, Collector<String> out) {
        try {
            if (value.contains("valid")) {
                out.collect(value);
            } else {
                throw new IllegalArgumentException("Invalid data");
            }
        } catch (Exception e) {
            ctx.output(errorTag, value);
        }
    }
});
 
DataStream<String> errorEvents = validEvents.getSideOutput(errorTag);
  • The errorTag identifies invalid records, which can be stored separately or reviewed manually.

Stream Branching for A/B Testing

A/B testing often requires splitting a stream based on experimental groups. Output tags allow you to dynamically route events to different branches.

final OutputTag<Event> groupATag = new OutputTag<Event>("group-a"){};
final OutputTag<Event> groupBTag = new OutputTag<Event>("group-b"){};
 
DataStream<Event> processedStream = mainStream.process(new ProcessFunction<Event, Event>() {
    @Override
    public void processElement(Event event, Context ctx, Collector<Event> out) {
        if (event.getGroup().equals("A")) {
            ctx.output(groupATag, event);
        } else if (event.getGroup().equals("B")) {
            ctx.output(groupBTag, event);
        } else {
            out.collect(event);
        }
    }
});
 
DataStream<Event> groupAEvents = processedStream.getSideOutput(groupATag);
DataStream<Event> groupBEvents = processedStream.getSideOutput(groupBTag);
  • This approach allows parallel experimentation, with different logic applied to each group.

Separating High and Low Priority Events

In real-time processing, some events may need immediate attention while others can be processed asynchronously. By tagging high-priority events, you can create a fast lane in your data pipeline.

final OutputTag<Event> highPriorityTag = new OutputTag<Event>("high-priority"){};
 
DataStream<Event> processedStream = mainStream.process(new ProcessFunction<Event, Event>() {
    @Override
    public void processElement(Event event, Context ctx, Collector<Event> out) {
        if (event.isHighPriority()) {
            ctx.output(highPriorityTag, event);
        } else {
            out.collect(event);
        }
    }
});
 
DataStream<Event> highPriorityEvents = processedStream.getSideOutput(highPriorityTag);
  • High-priority events can be processed in a low-latency sink, while normal events remain in the main processing flow.

Summary

Output tags in Apache Flink provide a powerful mechanism for branching data streams, enabling scenarios like handling late data, error management, A/B testing, and priority routing. By using side outputs, complex streaming pipelines can be optimized and simplified, ensuring that special cases do not interfere with primary processing logic.