Operators and Tasks

In Flink, operators (e.g., map, flatMap, keyBy, reduce) are the fundamental units of computation. The execution graph maps operators directly to tasks, each processing a specific partition of data. Tasks execute in parallel, one per partition, enabling distributed computation across the cluster. Tasks operate independently but coordinate when partitioning strategies (e.g., keyBy, rebalance) redistribute data between tasks.

Partitions in Flink

Partitioning determines how data is distributed among Operator Taskstasks. The strategy depends on the transformation or data source. For example:

  • Kafka Source: Partitions align with Kafka topic partitions, ensuring one task per partition.
  • File-Based Source: Partitions correspond to file splits or blocks, defined by the input format and storage system.
  • Custom Sources: Users can explicitly define partitioning logic when implementing custom sources. Transformations like keyBy enforce keyed partitioning, which is critical for operations requiring state, such as aggregations or windowing. Other partitioning strategies, such as rebalancing or rescaling, are useful when key alignment is not necessary.

Partitioning Strategies

  • Keyed Partitioning (keyBy): Routes records with the same key to the same downstream task. This is essential for stateful transformations that depend on grouped data, such as reduce or aggregate.
  • Rebalancing: Evenly redistributes data across all downstream tasks to address data skew. For example, after filtering, rebalancing ensures that downstream tasks handle approximately equal loads:
    stream.filter(value -> value.isImportant()).rebalance().map(value -> process(value));
    Without rebalancing, skewed partitions can overload some tasks while leaving others underutilized.
  • Rescaling: Adjusts partitioning when upstream and downstream parallelisms differ. It ensures proportional distribution but should only be used for stateless transformations.
  • Broadcasting: Sends each record to all downstream tasks. This is commonly used for small datasets like configuration data or lookup tables that every task needs.
  • Forward Partitioning: Preserves a 1:1 mapping between upstream and downstream tasks when parallelism remains unchanged. While it minimizes data movement, it can lead to inefficiencies if upstream data distribution is skewed.

If no explicit partitioning strategy is applied, Flink defaults to forward partitioning.

State Management in Flink

State in Flink is managed locally within tasks using state backends such as RocksDB or in-memory storage. Tasks directly handle the state for their assigned partition, ensuring low-latency access:

  • Keyed State: Scoped to specific keys within a partition and used for aggregations or windowing.
  • Operator State: Scoped to the operator as a whole, typically for tasks like managing offsets in a source.

State is periodically checkpointed to external storage (e.g., HDFS, S3) to ensure durability and enable fault recovery.

Checkpointing and Fault Tolerance

Flink ensures fault tolerance through checkpointing. During execution, each task saves its state to external storage. In case of failure, the job restarts from the last checkpoint, restoring the local state and resuming execution seamlessly. Checkpointing ensures consistency across distributed tasks.

Data Flow and Communication

Tasks communicate via partitioned streams, with the partitioning strategy defining data flow between upstream and downstream tasks. For example:

  • Keyed Partitioning aligns keys with partitions, ensuring correctness for stateful computations like aggregations and joins.
  • Rebalancing ensures balanced loads for stateless operations, avoiding bottlenecks from skewed data. Shuffling occurs automatically when repartitioning is necessary. Proper selection of partitioning strategies minimizes unnecessary data movement, maintaining correctness and optimizing performance. For example:
  • After a keyBy, data is repartitioned to ensure all records for a key are processed by the same task.
  • If filtering results in uneven partitions, applying rebalance() redistributes data evenly across tasks, ensuring efficient use of resources.