Back to Blog

Parallelization and Data Grouping

Scaling streaming systems horizontally by running multiple instances in parallel. Learn how data parallelism increases throughput, how grouping strategies affect event distribution, and why ordering matters when processing streams at scale.


In the previous article, we built a simple streaming job that processed clickstream events one at a time. But what happens when the event rate exceeds what a single operator instance can handle? The solution is parallelization, running multiple copies of operators to process events concurrently.


The Throughput Problem

Consider our clickstream analytics system again. During normal hours, it processes around 100 page views per second without issue. But during a product launch or marketing campaign, traffic can spike to 10,000 events per second. A single counter instance becomes overwhelmed, the queue backs up, latency increases, and events start timing out.

Problem

The fundamental bottleneck: a single operator instance can only process events sequentially. If each event takes 10 milliseconds to process, the maximum throughput is 100 events per second. When 10,000 events arrive per second, 99% of events wait in the queue.

Horizontal Scaling

Unlike vertical scaling (adding more CPU/memory to a single machine), horizontal scaling adds more machines running the same code. In streaming systems, this means running multiple instances of the same operator in parallel, each processing a subset of the event stream.

Solution

With parallelization, we can run N instances of an operator, multiplying throughput by N. Three instances processing at 100 events/second each can handle 300 events/second. Ten instances can handle 1,000 events/second. The system scales linearly with the number of instances.

Throughput Scaling with Parallelism
Operator Instances
instance 0
100 events/sec
Total Throughput
100
events per second
1 × 100 = 100
Linear scaling: each additional instance adds 100 events/sec capacity
Parallelization: Multiple Instances Processing Events
Source
ClickReader
Producing events
Event
Dispatcher
Counter
instance 0
Single instance
processes all events

Data Parallelism vs Task Parallelism

Before diving into implementation, it's important to distinguish between two types of parallelism in streaming systems: data parallelism and task parallelism.

Data Parallelism

Data parallelism means running multiple instances of the same operator, each processing a different subset of the data. This is what we use to scale throughput. All instances execute identical logic, they're clones of the same code.

For example, running three instances of PageViewCounter means three separate processes, each maintaining its own count map, each processing roughly one-third of the incoming events. The instances don't coordinate or share state, they work independently on their portion of the stream.

Task Parallelism

Task parallelism means running different operators that perform different tasks simultaneously. This is inherent in streaming systems, sources, operators, and sinks all run concurrently in their own threads or processes.

For example, in a pipeline with Filter → Transform → Aggregate operators, all three run in parallel. While the filter processes event N, the transform processes event N-1, and the aggregator processes event N-2. This pipeline parallelism is automatic in streaming frameworks.

Data Parallelism vs Task Parallelism
Multiple instances of the same operator processing different data
Source
PageViewCounter
instance 0
PageViewCounter
instance 1
PageViewCounter
instance 2
Same logic, different data → scales throughput
Why parallelism matters

Data parallelism is how we scale streaming systems to handle high throughput. Task parallelism is how streaming systems achieve low latency, events flow through the pipeline without waiting for upstream stages to finish all their work.


Implementing Parallelization

To parallelize an operator in the StreamKit framework, we need to specify how many instances should run. The framework handles creating multiple executor threads and distributing events among them.

Setting Parallelism

The StreamKit framework provides a simple API to set parallelism when adding operators to a stream. The parallelism parameter tells the framework how many concurrent instances to create.

clickstream_job.pypython
if __name__ == "__main__":
  job = Job()
  click_stream = job.add_source(ClickReader("click-reader", 9990))

  # Run 3 parallel instances of the counter
  click_stream.apply_operator(
      PageViewCounter("page-view-counter"),
      parallelism=3
  )

  starter = JobStarter(job)
  starter.start()

With this configuration, the framework creates three separate PageViewCounter instances. Each instance runs in its own thread with its own incoming queue. Events from the source are distributed across these three queues.

How the Framework Distributes Events

When parallelism is greater than 1, the framework needs a strategy to distribute events among operator instances. The simplest approach is round-robin distribution: send event 1 to instance 0, event 2 to instance 1, event 3 to instance 2, event 4 back to instance 0, and so on.

stream.pypython
class Stream:
  def __init__(self, source):
      self.source = source
      self.operators = []
      self.operator_queues = []

  def apply_operator(self, operator, parallelism=1):
      # Create multiple instances and queues
      for i in range(parallelism):
          instance_name = f"{operator.name}-{i}"
          op_instance = operator.clone(instance_name)
          op_queue = queue.Queue()

          self.operators.append(op_instance)
          self.operator_queues.append(op_queue)
          op_instance.incoming_queue = op_queue

      # Store source output queue for event distribution
      self.source.outgoing_queue = queue.Queue()

      return self

The event dispatcher continuously pulls events from the source's outgoing queue and distributes them to operator instance queues using round-robin or another strategy.

event_dispatcher.pypython
class EventDispatcher(threading.Thread):
  def __init__(self, source_queue, operator_queues):
      super().__init__()
      self.source_queue = source_queue
      self.operator_queues = operator_queues
      self.current_index = 0

  def run(self):
      while True:
          try:
              event = self.source_queue.get(timeout=0.1)

              # Round-robin distribution
              target_queue = self.operator_queues[self.current_index]
              target_queue.put(event)

              # Move to next instance
              self.current_index = (self.current_index + 1) % len(self.operator_queues)

          except queue.Empty:
              continue

Event Grouping Strategies

Round-robin works for stateless operators, but what about stateful operators like our PageViewCounter? If events for /home are randomly distributed across instances, each instance maintains separate counts. We'd get three different counts for /home instead of one global count.

Problem

With random distribution, a stateful operator like PageViewCounter produces incorrect results. If instance 0 sees 10 views of /home, instance 1 sees 7 views, and instance 2 sees 5 views, none of them knows the true total is 22 views. The state is fragmented across instances.

The solution is grouping, controlling which events go to which instances based on event properties. Streaming frameworks typically support two grouping strategies:

State Fragmentation: Shuffle vs Fields Grouping
After processing 11 events: /home (6×), /products (2×), /about (3×)
instance0
/home: 3
/products: 1
instance1
/home: 2
/about: 2
instance2
/home: 1
/products: 1
/about: 1
❌ Shuffle grouping fragments state: /home count split across 3 instances (3+2+1=6)

Shuffle Grouping

Shuffle grouping (also called random grouping) distributes events randomly across instances. This is equivalent to round-robin and works well for stateless operators where any instance can process any event.

Use shuffle grouping when:

  • The operator is stateless (doesn't maintain counts, aggregates, or other state)
  • Every event is independent
  • You want to balance load evenly across instances

Fields Grouping

Fields grouping (also called key-based grouping or partitioning) distributes events based on a key extracted from the event. Events with the same key always go to the same instance. This is implemented using a hash function.

Use fields grouping when:

  • The operator is stateful and needs to see all events for a particular key
  • You need to maintain per-key aggregates or counts
  • Event ordering matters (within a key)
Event Grouping Strategies
Shuffle grouping distributes events randomly across instances
Events
/home
/products
/home
/about
random
Instances
instance 0
instance 1
instance 2
Same URL can go to different instances → state is fragmented

Implementing Fields Grouping

To implement fields grouping, events need to provide a key that the dispatcher can hash. The hash determines which instance receives the event.

click_event.pypython
class ClickEvent(Event):
  def __init__(self, url):
      self.url = url

  def get_data(self):
      return self.url

  def get_key(self):
      """Return the key for grouping (the URL)"""
      return self.url
Hash Function: Consistent Routing
Event Key
/home
Hash Function
hash("/home")
= 46613902
Modulo
46613902 % 3
= 1
Target Instance
instance 1
Same input always produces same hash → events with /home always go to instance 1

The event dispatcher uses this key to determine the target instance:

event_dispatcher.pypython
class EventDispatcher(threading.Thread):
  def __init__(self, source_queue, operator_queues, grouping='shuffle'):
      super().__init__()
      self.source_queue = source_queue
      self.operator_queues = operator_queues
      self.grouping = grouping
      self.current_index = 0

  def run(self):
      while True:
          try:
              event = self.source_queue.get(timeout=0.1)

              if self.grouping == 'shuffle':
                  # Round-robin distribution
                  target_idx = self.current_index
                  self.current_index = (self.current_index + 1) % len(self.operator_queues)
              else:
                  # Fields grouping: hash the key
                  key = event.get_key()
                  target_idx = hash(key) % len(self.operator_queues)

              self.operator_queues[target_idx].put(event)

          except queue.Empty:
              continue
Event Dispatcher Flow
Step 1: Source produces event
Incoming Queue
/home
Event Dispatcher
Grouping Logic
Outgoing Queues
Queue 0
Queue 1
Queue 2
Upstream component pushes /home event to incoming queue

Now all events with url="/home" go to the same instance, which maintains an accurate count. Events with url="/products" go to a different instance (determined by the hash), which maintains a separate accurate count for that URL.

Hash functions and distribution

A good hash function distributes keys evenly across instances. Python's built-in hash() function works well for strings and numbers. Poor hash functions create "hot" instances that receive most events while other instances sit idle.


Event Ordering and Parallelization

Parallelization introduces a subtle but important issue: event ordering. When events flow through a single operator instance, they're processed in the order they arrive. With multiple instances, ordering guarantees change.

Ordering with Shuffle Grouping

With shuffle grouping, events are distributed randomly across instances. Even if the source sends events 1, 2, 3, 4 in order, they might be processed as 1, 3, 2, 4 if events 2 and 3 go to different instances that process at different speeds.

Shuffle grouping provides no ordering guarantees. This is acceptable for stateless operations where order doesn't matter, but problematic for operations that depend on sequence.

Ordering with Fields Grouping

Fields grouping provides a stronger guarantee: events with the same key are processed in order. If the source sends events with key="A" in order 1, 2, 3, they'll arrive at the same instance and be processed in that order.

However, events with different keys may still be reordered. Event with key="A" sent before event with key="B" might be processed after it if they go to different instances.

Event Ordering Guarantees
Events Sent (in order)
Event 1: /home10:00:01
Event 2: /home10:00:02
Event 3: /home10:00:03
Event 4: /home10:00:04
Events Processed
Event 1: /home10:00:01
Event 2: /home10:00:02
Event 3: /home10:00:03
Event 4: /home10:00:04
✓ With fields grouping, all events for the same key go to the same instance, preserving order
Why ordering matters

For stateful operations like counting, the order within a key matters. If we're tracking user sessions and events arrive out of order, we might process a "logout" event before the "login" event, leading to incorrect session duration calculations.

Timestamps and Event Time

To handle out-of-order events, streaming systems use event time, the timestamp when the event actually occurred, rather than processing time (when the system receives it). Events carry timestamps, and operators can buffer and reorder events based on these timestamps.

click_event.pypython
import time

class ClickEvent(Event):
  def __init__(self, url, timestamp=None):
      self.url = url
      self.timestamp = timestamp or time.time()

  def get_key(self):
      return self.url

  def get_timestamp(self):
      return self.timestamp

More advanced streaming systems provide windowing and watermarking mechanisms to handle late-arriving events and out-of-order processing. We'll explore these topics in future articles.


Parallelizing Sources

So far we've focused on parallelizing operators. But sources can also be parallelized to ingest data faster. Instead of one ClickReader listening on port 9990, we could run three ClickReaders on ports 9990, 9991, and 9992.

clickstream_job.pypython
if __name__ == "__main__":
  job = Job()

  # Add three parallel sources
  stream1 = job.add_source(ClickReader("click-reader-0", 9990))
  stream2 = job.add_source(ClickReader("click-reader-1", 9991))
  stream3 = job.add_source(ClickReader("click-reader-2", 9992))

  # Merge the streams
  merged_stream = job.merge_streams([stream1, stream2, stream3])

  # Apply operator with fields grouping
  merged_stream.apply_operator(
      PageViewCounter("page-view-counter"),
      parallelism=3,
      grouping='fields'
  )

  starter = JobStarter(job)
  starter.start()

This pattern is common when reading from partitioned data sources like Kafka topics (which have multiple partitions) or distributed file systems (which have multiple files). Each source instance reads from one partition or file.


Practical Considerations

Choosing Parallelism

How many instances should you run? Consider these factors:

  • Throughput requirements: If each instance processes 100 events/second and you need 1,000 events/second, you need at least 10 instances.
  • Available resources: Each instance consumes CPU and memory. Don't over-parallelize beyond your hardware capacity.
  • Key cardinality: With fields grouping, parallelism is limited by the number of unique keys. If you only have 5 unique URLs, running 100 instances means 95 sit idle.
  • Downstream capacity: If the next operator can only handle 500 events/second, there's no benefit to producing 1,000 events/second upstream.

State Management Challenges

Parallelization fragments state across instances. Each PageViewCounter instance maintains its own count map. To get global counts, you'd need to query all instances and merge results. This becomes complex in distributed systems.

Solutions include:

  • Reduce operators: Add a final single-instance operator that aggregates results from all parallel instances
  • External state stores: Store state in a distributed database (Redis, Cassandra) that all instances can access
  • Broadcast queries: When querying state, send the query to all instances and merge responses

Monitoring and Debugging

With multiple instances, monitoring becomes critical. You need visibility into:

  • Queue depths per instance (to detect bottlenecks)
  • Processing rate per instance (to detect imbalanced load)
  • Error rates per instance (to detect failing instances)
  • Resource usage (CPU, memory) per instance

Summary

Parallelization is how streaming systems achieve high throughput. By running multiple instances of operators, we can scale linearly with the number of instances. Key concepts:

  • Data parallelism - Multiple instances of the same operator processing different subsets of data
  • Task parallelism - Different operators running concurrently in a pipeline
  • Shuffle grouping - Random distribution for stateless operations
  • Fields grouping - Key-based distribution for stateful operations
  • Event ordering - Fields grouping preserves order within keys
  • Parallelizing sources - Multiple source instances for higher ingestion rates
What's next?

In real-world streaming systems, you'll encounter more advanced topics:

  • Windowing - Grouping events into time-based or count-based windows
  • Watermarks - Handling late-arriving events and triggering computations
  • Backpressure - Slowing down sources when operators can't keep up
  • Fault tolerance - Checkpointing state and recovering from failures
  • Exactly-once processing - Ensuring events are processed exactly once, not zero or multiple times

Production frameworks like Apache Flink, Apache Storm, and Apache Kafka Streams handle parallelization automatically. They distribute operators across cluster nodes, manage state, provide exactly-once semantics, and handle failures transparently.

Exercises

  1. Modify the PageViewCounter to track not just counts but also the instance ID that processed each event. Print both the count and which instances saw events for each URL.
  2. Implement a custom grouping strategy that uses the first character of the URL as the key (so all URLs starting with "/p" go to the same instance). How does this affect load balancing?
  3. Create a two-stage pipeline: parallel filters that pass through only certain URL patterns, followed by parallel counters. Experiment with different grouping strategies at each stage.
  4. Add timestamps to ClickEvent and modify PageViewCounter to detect and log out-of-order events (events whose timestamp is earlier than the previous event processed).
  5. Implement a simple reduce operator that runs as a single instance, receives events from all parallel counter instances, and maintains a global count across all URLs.
  6. Measure the throughput improvement from parallelization. Generate synthetic events at a high rate and compare processing time with 1, 2, 4, and 8 parallel instances. Does it scale linearly?