Parallelization and Data Grouping
Scaling streaming systems horizontally by running multiple instances in parallel. Learn how data parallelism increases throughput, how grouping strategies affect event distribution, and why ordering matters when processing streams at scale.
In the previous article, we built a simple streaming job that processed clickstream events one at a time. But what happens when the event rate exceeds what a single operator instance can handle? The solution is parallelization, running multiple copies of operators to process events concurrently.
The Throughput Problem
Consider our clickstream analytics system again. During normal hours, it processes around 100 page views per second without issue. But during a product launch or marketing campaign, traffic can spike to 10,000 events per second. A single counter instance becomes overwhelmed, the queue backs up, latency increases, and events start timing out.
The fundamental bottleneck: a single operator instance can only process events sequentially. If each event takes 10 milliseconds to process, the maximum throughput is 100 events per second. When 10,000 events arrive per second, 99% of events wait in the queue.
Horizontal Scaling
Unlike vertical scaling (adding more CPU/memory to a single machine), horizontal scaling adds more machines running the same code. In streaming systems, this means running multiple instances of the same operator in parallel, each processing a subset of the event stream.
With parallelization, we can run N instances of an operator, multiplying throughput by N. Three instances processing at 100 events/second each can handle 300 events/second. Ten instances can handle 1,000 events/second. The system scales linearly with the number of instances.
Dispatcher
processes all events
Data Parallelism vs Task Parallelism
Before diving into implementation, it's important to distinguish between two types of parallelism in streaming systems: data parallelism and task parallelism.
Data Parallelism
Data parallelism means running multiple instances of the same operator, each processing a different subset of the data. This is what we use to scale throughput. All instances execute identical logic, they're clones of the same code.
For example, running three instances of PageViewCounter means three separate processes, each maintaining its own count map, each processing roughly one-third of the incoming events. The instances don't coordinate or share state, they work independently on their portion of the stream.
Task Parallelism
Task parallelism means running different operators that perform different tasks simultaneously. This is inherent in streaming systems, sources, operators, and sinks all run concurrently in their own threads or processes.
For example, in a pipeline with Filter → Transform → Aggregate operators, all three run in parallel. While the filter processes event N, the transform processes event N-1, and the aggregator processes event N-2. This pipeline parallelism is automatic in streaming frameworks.
Why parallelism matters
Data parallelism is how we scale streaming systems to handle high throughput. Task parallelism is how streaming systems achieve low latency, events flow through the pipeline without waiting for upstream stages to finish all their work.
Implementing Parallelization
To parallelize an operator in the StreamKit framework, we need to specify how many instances should run. The framework handles creating multiple executor threads and distributing events among them.
Setting Parallelism
The StreamKit framework provides a simple API to set parallelism when adding operators to a stream. The parallelism parameter tells the framework how many concurrent instances to create.
if __name__ == "__main__":
job = Job()
click_stream = job.add_source(ClickReader("click-reader", 9990))
# Run 3 parallel instances of the counter
click_stream.apply_operator(
PageViewCounter("page-view-counter"),
parallelism=3
)
starter = JobStarter(job)
starter.start()With this configuration, the framework creates three separate PageViewCounter instances. Each instance runs in its own thread with its own incoming queue. Events from the source are distributed across these three queues.
How the Framework Distributes Events
When parallelism is greater than 1, the framework needs a strategy to distribute events among operator instances. The simplest approach is round-robin distribution: send event 1 to instance 0, event 2 to instance 1, event 3 to instance 2, event 4 back to instance 0, and so on.
class Stream:
def __init__(self, source):
self.source = source
self.operators = []
self.operator_queues = []
def apply_operator(self, operator, parallelism=1):
# Create multiple instances and queues
for i in range(parallelism):
instance_name = f"{operator.name}-{i}"
op_instance = operator.clone(instance_name)
op_queue = queue.Queue()
self.operators.append(op_instance)
self.operator_queues.append(op_queue)
op_instance.incoming_queue = op_queue
# Store source output queue for event distribution
self.source.outgoing_queue = queue.Queue()
return selfThe event dispatcher continuously pulls events from the source's outgoing queue and distributes them to operator instance queues using round-robin or another strategy.
class EventDispatcher(threading.Thread):
def __init__(self, source_queue, operator_queues):
super().__init__()
self.source_queue = source_queue
self.operator_queues = operator_queues
self.current_index = 0
def run(self):
while True:
try:
event = self.source_queue.get(timeout=0.1)
# Round-robin distribution
target_queue = self.operator_queues[self.current_index]
target_queue.put(event)
# Move to next instance
self.current_index = (self.current_index + 1) % len(self.operator_queues)
except queue.Empty:
continueEvent Grouping Strategies
Round-robin works for stateless operators, but what about stateful operators like our
PageViewCounter? If events for /home are randomly distributed across
instances, each instance maintains separate counts. We'd get three different counts
for /home instead of one global count.
With random distribution, a stateful operator like PageViewCounter produces incorrect
results. If instance 0 sees 10 views of /home, instance 1 sees 7 views, and
instance 2 sees 5 views, none of them knows the true total is 22 views. The state is
fragmented across instances.
The solution is grouping, controlling which events go to which instances based on event properties. Streaming frameworks typically support two grouping strategies:
Shuffle Grouping
Shuffle grouping (also called random grouping) distributes events randomly across instances. This is equivalent to round-robin and works well for stateless operators where any instance can process any event.
Use shuffle grouping when:
- The operator is stateless (doesn't maintain counts, aggregates, or other state)
- Every event is independent
- You want to balance load evenly across instances
Fields Grouping
Fields grouping (also called key-based grouping or partitioning) distributes events based on a key extracted from the event. Events with the same key always go to the same instance. This is implemented using a hash function.
Use fields grouping when:
- The operator is stateful and needs to see all events for a particular key
- You need to maintain per-key aggregates or counts
- Event ordering matters (within a key)
Implementing Fields Grouping
To implement fields grouping, events need to provide a key that the dispatcher can hash. The hash determines which instance receives the event.
class ClickEvent(Event):
def __init__(self, url):
self.url = url
def get_data(self):
return self.url
def get_key(self):
"""Return the key for grouping (the URL)"""
return self.urlThe event dispatcher uses this key to determine the target instance:
class EventDispatcher(threading.Thread):
def __init__(self, source_queue, operator_queues, grouping='shuffle'):
super().__init__()
self.source_queue = source_queue
self.operator_queues = operator_queues
self.grouping = grouping
self.current_index = 0
def run(self):
while True:
try:
event = self.source_queue.get(timeout=0.1)
if self.grouping == 'shuffle':
# Round-robin distribution
target_idx = self.current_index
self.current_index = (self.current_index + 1) % len(self.operator_queues)
else:
# Fields grouping: hash the key
key = event.get_key()
target_idx = hash(key) % len(self.operator_queues)
self.operator_queues[target_idx].put(event)
except queue.Empty:
continueNow all events with url="/home" go to the same instance, which maintains an
accurate count. Events with url="/products" go to a different instance
(determined by the hash), which maintains a separate accurate count for that URL.
Hash functions and distribution
A good hash function distributes keys evenly across instances. Python's built-in
hash() function works well for strings and numbers. Poor hash functions
create "hot" instances that receive most events while other instances sit idle.
Event Ordering and Parallelization
Parallelization introduces a subtle but important issue: event ordering. When events flow through a single operator instance, they're processed in the order they arrive. With multiple instances, ordering guarantees change.
Ordering with Shuffle Grouping
With shuffle grouping, events are distributed randomly across instances. Even if the source sends events 1, 2, 3, 4 in order, they might be processed as 1, 3, 2, 4 if events 2 and 3 go to different instances that process at different speeds.
Shuffle grouping provides no ordering guarantees. This is acceptable for stateless operations where order doesn't matter, but problematic for operations that depend on sequence.
Ordering with Fields Grouping
Fields grouping provides a stronger guarantee: events with the same key are processed in order. If the source sends events with key="A" in order 1, 2, 3, they'll arrive at the same instance and be processed in that order.
However, events with different keys may still be reordered. Event with key="A" sent before event with key="B" might be processed after it if they go to different instances.
Why ordering matters
For stateful operations like counting, the order within a key matters. If we're tracking user sessions and events arrive out of order, we might process a "logout" event before the "login" event, leading to incorrect session duration calculations.
Timestamps and Event Time
To handle out-of-order events, streaming systems use event time, the timestamp when the event actually occurred, rather than processing time (when the system receives it). Events carry timestamps, and operators can buffer and reorder events based on these timestamps.
import time
class ClickEvent(Event):
def __init__(self, url, timestamp=None):
self.url = url
self.timestamp = timestamp or time.time()
def get_key(self):
return self.url
def get_timestamp(self):
return self.timestampMore advanced streaming systems provide windowing and watermarking mechanisms to handle late-arriving events and out-of-order processing. We'll explore these topics in future articles.
Parallelizing Sources
So far we've focused on parallelizing operators. But sources can also be parallelized to ingest data faster. Instead of one ClickReader listening on port 9990, we could run three ClickReaders on ports 9990, 9991, and 9992.
if __name__ == "__main__":
job = Job()
# Add three parallel sources
stream1 = job.add_source(ClickReader("click-reader-0", 9990))
stream2 = job.add_source(ClickReader("click-reader-1", 9991))
stream3 = job.add_source(ClickReader("click-reader-2", 9992))
# Merge the streams
merged_stream = job.merge_streams([stream1, stream2, stream3])
# Apply operator with fields grouping
merged_stream.apply_operator(
PageViewCounter("page-view-counter"),
parallelism=3,
grouping='fields'
)
starter = JobStarter(job)
starter.start()This pattern is common when reading from partitioned data sources like Kafka topics (which have multiple partitions) or distributed file systems (which have multiple files). Each source instance reads from one partition or file.
Practical Considerations
Choosing Parallelism
How many instances should you run? Consider these factors:
- Throughput requirements: If each instance processes 100 events/second and you need 1,000 events/second, you need at least 10 instances.
- Available resources: Each instance consumes CPU and memory. Don't over-parallelize beyond your hardware capacity.
- Key cardinality: With fields grouping, parallelism is limited by the number of unique keys. If you only have 5 unique URLs, running 100 instances means 95 sit idle.
- Downstream capacity: If the next operator can only handle 500 events/second, there's no benefit to producing 1,000 events/second upstream.
State Management Challenges
Parallelization fragments state across instances. Each PageViewCounter instance maintains its own count map. To get global counts, you'd need to query all instances and merge results. This becomes complex in distributed systems.
Solutions include:
- Reduce operators: Add a final single-instance operator that aggregates results from all parallel instances
- External state stores: Store state in a distributed database (Redis, Cassandra) that all instances can access
- Broadcast queries: When querying state, send the query to all instances and merge responses
Monitoring and Debugging
With multiple instances, monitoring becomes critical. You need visibility into:
- Queue depths per instance (to detect bottlenecks)
- Processing rate per instance (to detect imbalanced load)
- Error rates per instance (to detect failing instances)
- Resource usage (CPU, memory) per instance
Summary
Parallelization is how streaming systems achieve high throughput. By running multiple instances of operators, we can scale linearly with the number of instances. Key concepts:
- Data parallelism - Multiple instances of the same operator processing different subsets of data
- Task parallelism - Different operators running concurrently in a pipeline
- Shuffle grouping - Random distribution for stateless operations
- Fields grouping - Key-based distribution for stateful operations
- Event ordering - Fields grouping preserves order within keys
- Parallelizing sources - Multiple source instances for higher ingestion rates
What's next?
In real-world streaming systems, you'll encounter more advanced topics:
- Windowing - Grouping events into time-based or count-based windows
- Watermarks - Handling late-arriving events and triggering computations
- Backpressure - Slowing down sources when operators can't keep up
- Fault tolerance - Checkpointing state and recovering from failures
- Exactly-once processing - Ensuring events are processed exactly once, not zero or multiple times
Production frameworks like Apache Flink, Apache Storm, and Apache Kafka Streams handle parallelization automatically. They distribute operators across cluster nodes, manage state, provide exactly-once semantics, and handle failures transparently.
Exercises
- Modify the PageViewCounter to track not just counts but also the instance ID that processed each event. Print both the count and which instances saw events for each URL.
- Implement a custom grouping strategy that uses the first character of the URL as the key (so all URLs starting with "/p" go to the same instance). How does this affect load balancing?
- Create a two-stage pipeline: parallel filters that pass through only certain URL patterns, followed by parallel counters. Experiment with different grouping strategies at each stage.
- Add timestamps to ClickEvent and modify PageViewCounter to detect and log out-of-order events (events whose timestamp is earlier than the previous event processed).
- Implement a simple reduce operator that runs as a single instance, receives events from all parallel counter instances, and maintains a global count across all URLs.
- Measure the throughput improvement from parallelization. Generate synthetic events at a high rate and compare processing time with 1, 2, 4, and 8 parallel instances. Does it scale linearly?