Overview
The Producer-Consumer pattern decouples the production of data from its consumption using a shared buffer (channel). Multiple producers can generate data and send it to the buffer, while multiple consumers read from the buffer and process the data. This pattern is essential for buffering bursts of work, decoupling work rates between producers and consumers, and load leveling.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The producer-consumer implementation in examples/producer_consumer.go consists of three main components:
- Producers – Multiple goroutines that generate data
- Buffer – A shared channel that holds the data
- Consumers – Multiple goroutines that process the data
Code Analysis
Let’s break down the main function and understand how each component works:
func RunProducerConsumer() {
bufferSize := 5
numProducers := 2
numConsumers := 3
numItems := 10
buffer := make(chan int, bufferSize)
var wg sync.WaitGroup
// Start producers
for p := 1; p <= numProducers; p++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < numItems; i++ {
item := rand.Intn(100)
buffer <- item
fmt.Printf("Producer %d produced: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond)
}
}(p)
}
// Start consumers
var consumerWg sync.WaitGroup
for c := 1; c <= numConsumers; c++ {
consumerWg.Add(1)
go func(id int) {
defer consumerWg.Done()
for item := range buffer {
fmt.Printf("Consumer %d consumed: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
}
}(c)
}
// Wait for all producers to finish, then close the buffer
wg.Wait()
close(buffer)
// Wait for all consumers to finish
consumerWg.Wait()
}
Step-by-step breakdown:
- Configuration Setup:
bufferSize := 5defines the buffer capacity – can hold up to 5 itemsnumProducers := 2launches 2 producer goroutinesnumConsumers := 3launches 3 consumer goroutinesnumItems := 10specifies how many items each producer will create- These values create a scenario where producers generate more data than consumers can immediately process
- Buffer and Synchronization Setup:
buffer := make(chan int, bufferSize)creates a buffered channel with capacity 5- Buffering allows producers to continue working even when consumers are slow
var wg sync.WaitGrouptracks when all producers completevar consumerWg sync.WaitGrouptracks when all consumers complete
- Producer Launch Loop:
- Launches
numProducersgoroutines (2 in this case) - Each producer gets a unique ID (1, 2) for tracking
- Uses closure
func(id int) { ... }(p)to capture the producer ID wg.Add(1)increments the producer wait group before each goroutine
- Launches
- Producer Goroutine Implementation:
defer wg.Done()ensures the producer signals completion when it exitsfor i := 0; i < numItems; i++loop creates exactly 10 items per produceritem := rand.Intn(100)generates random data (0-99) to simulate real databuffer <- itemsends the item to the shared buffer (may block if buffer is full)time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond)simulates variable production time (100-300ms)
- Consumer Launch Loop:
- Launches
numConsumersgoroutines (3 in this case) - Each consumer gets a unique ID (1, 2, 3) for tracking
- Uses closure to capture the consumer ID
consumerWg.Add(1)increments the consumer wait group before each goroutine
- Launches
- Consumer Goroutine Implementation:
defer consumerWg.Done()ensures the consumer signals completion when it exitsfor item := range buffercontinuously reads from the buffer until it closestime.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)simulates variable processing time (100-400ms)- Consumers compete for items from the shared buffer (automatic load balancing)
- Graceful Shutdown Process:
wg.Wait()waits for all producers to finish generating their itemsclose(buffer)signals to consumers that no more data is comingconsumerWg.Wait()waits for all consumers to finish processing remaining items- This ensures clean termination without goroutine leaks
Key Design Patterns:
- Closure Pattern:
func(id int) { ... }(p)captures the loop variablepin each goroutine’s closure, ensuring each goroutine gets a unique ID. - Dual WaitGroup Strategy: Separate wait groups for producers and consumers allow for coordinated shutdown – producers finish first, then buffer closes, then consumers finish.
- Buffered Channel as Queue: The buffer decouples production and consumption rates, allowing the system to handle temporary mismatches in speed.
- Range Loop for Consumers:
for item := range bufferautomatically exits when the buffer closes, providing natural termination. - Simulated Work: Random delays make concurrency visible and demonstrate how the pattern handles varying processing speeds.
How It Works
- Producer Initialization: Multiple producer goroutines start generating data
- Data Production: Producers create data and send it to the shared buffer channel
- Buffer Management: The buffered channel holds data until consumers are ready
- Consumer Processing: Multiple consumer goroutines read from the buffer and process data
- Graceful Shutdown: When all producers finish, the buffer is closed, signaling consumers to stop
The pattern enables asynchronous processing where producers and consumers can operate at different rates.
Why This Implementation?
Buffered Channel as Buffer
- Decoupling: Producers and consumers can operate independently
- Burst Handling: Buffer can absorb temporary spikes in production
- Flow Control: Natural backpressure when buffer is full
Multiple Producers
- Parallel Production: Multiple sources can generate data simultaneously
- Load Distribution: Work can be distributed across multiple producers
- Fault Tolerance: If one producer fails, others continue
Multiple Consumers
- Parallel Processing: Multiple consumers can process data simultaneously
- Load Balancing: Work is automatically distributed among consumers
- Scalability: Easy to add or remove consumers based on load
Separate WaitGroups
- Producer Coordination: Ensures all producers finish before closing buffer
- Consumer Coordination: Ensures all consumers finish before program exits
- Clean Shutdown: Proper coordination prevents goroutine leaks
Channel Closing Strategy
- Signal Completion: Closing the buffer signals consumers that no more data is coming
- Graceful Termination: Consumers naturally exit when buffer is closed
- Resource Cleanup: Prevents consumers from waiting indefinitely
Key Design Decisions
- Buffered Channel Size: The buffer size (5) determines how much data can be queued
- Producer Count: Multiple producers (2) demonstrate parallel data generation
- Consumer Count: Multiple consumers (3) demonstrate parallel data processing
- Simulated Work: Random delays simulate real processing time and make concurrency visible
- Structured Shutdown: Proper coordination ensures clean program termination
Performance Characteristics
Throughput
- Limited by Slowest Component: Overall throughput is limited by the slowest producer or consumer
- Buffer Impact: Larger buffers can handle bigger bursts but use more memory
- Parallel Processing: Multiple consumers can increase processing throughput
Latency
- Buffer Time: Data may wait in the buffer if consumers are slow
- Processing Time: Individual item processing time depends on consumer speed
- Fair Distribution: Items are consumed in FIFO order
Resource Usage
- Memory: Buffer size determines memory usage
- CPU: Multiple producers and consumers can utilize multiple CPU cores
- Coordination Overhead: Minimal overhead from channel operations
Common Use Cases
Data Processing Pipelines
- Log Processing: Multiple log sources → Buffer → Multiple log processors
- Image Processing: Multiple cameras → Buffer → Multiple image analyzers
- Sensor Data: Multiple sensors → Buffer → Multiple data processors
Message Queuing Systems
- Event Processing: Multiple event sources → Buffer → Multiple event handlers
- Notification Systems: Multiple notification sources → Buffer → Multiple delivery agents
- Webhook Processing: Multiple webhook sources → Buffer → Multiple webhook handlers
Batch Processing
- File Processing: Multiple file sources → Buffer → Multiple file processors
- Report Generation: Multiple data sources → Buffer → Multiple report generators
- Data Import: Multiple data sources → Buffer → Multiple import processors
Real-time Systems
- Trading Systems: Multiple market data feeds → Buffer → Multiple trading algorithms
- IoT Applications: Multiple device sensors → Buffer → Multiple data analyzers
- Monitoring Systems: Multiple metric sources → Buffer → Multiple alert processors
API Rate Limiting
- Request Processing: Multiple API clients → Buffer → Rate-limited API processors
- Data Synchronization: Multiple sync sources → Buffer → Controlled sync processors
- External Service Calls: Multiple call sources → Buffer → Rate-limited service callers
Content Processing
- Video Processing: Multiple video sources → Buffer → Multiple video processors
- Document Processing: Multiple document sources → Buffer → Multiple document analyzers
- Audio Processing: Multiple audio sources → Buffer → Multiple audio processors
Database Operations
- Write Operations: Multiple write sources → Buffer → Database writers
- Read Operations: Multiple read requests → Buffer → Database readers
- Migration Jobs: Multiple migration sources → Buffer → Migration processors
The producer-consumer pattern is particularly effective when you have:
- Variable Production Rates: Producers that generate data at unpredictable rates
- Variable Consumption Rates: Consumers that process data at different speeds
- Bursty Workloads: Periods of high activity followed by low activity
- Resource Constraints: Need to limit resource usage while maintaining throughput
- Decoupled Systems: Components that should operate independently
This pattern provides a robust foundation for building scalable, responsive systems that can handle varying loads efficiently.
The final example implementation looks like this:
package examples
import (
"fmt"
"math/rand"
"sync"
"time"
)
// RunProducerConsumer demonstrates the producer-consumer pattern with multiple producers and consumers.
func RunProducerConsumer() {
fmt.Println("=== Producer-Consumer Pattern Example ===")
bufferSize := 5
numProducers := 2
numConsumers := 3
numItems := 10
buffer := make(chan int, bufferSize)
var wg sync.WaitGroup
// Start producers
for p := 1; p <= numProducers; p++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
for i := 0; i < numItems; i++ {
item := rand.Intn(100)
buffer <- item
fmt.Printf("Producer %d produced: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond)
}
}(p)
}
// Start consumers
var consumerWg sync.WaitGroup
for c := 1; c <= numConsumers; c++ {
consumerWg.Add(1)
go func(id int) {
defer consumerWg.Done()
for item := range buffer {
fmt.Printf("Consumer %d consumed: %d\n", id, item)
time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
}
}(c)
}
// Wait for all producers to finish, then close the buffer
wg.Wait()
close(buffer)
// Wait for all consumers to finish
consumerWg.Wait()
fmt.Println("Producer-Consumer example completed!")
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the supervisor-restart pattern.
One thought on “Go Concurrency Patterns(Producer-Consumer Pattern)”
Comments are closed.