Overview
The Worker Pools pattern maintains a fixed number of workers that process jobs from a shared queue. This pattern is essential for controlling resource usage, handling bursty workloads, and providing predictable performance in concurrent systems.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The worker pools implementation in examples/pools.go consists of three main components:
- Job Queue – A buffered channel that holds jobs to be processed
- Worker Pool – A fixed number of worker goroutines
- Result Collection – A channel for collecting processed results
Code Analysis
Let’s break down the main function and understand how each component works:
func RunPools() {
// Configuration
numWorkers := 3
numJobs := 15
// Create job channel
jobs := make(chan int, numJobs)
// Create result channel
results := make(chan string, numJobs)
// Start the worker pool
var wg sync.WaitGroup
// Launch workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go workerPool(i, jobs, results, &wg)
}
// Send jobs to the pool
go func() {
defer close(jobs)
for i := 1; i <= numJobs; i++ {
fmt.Printf("Sending job %d to pool\n", i)
jobs <- i
time.Sleep(100 * time.Millisecond) // Simulate job generation time
}
}()
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect results
for result := range results {
fmt.Printf("Result: %s\n", result)
}
}
Step-by-step breakdown:
- Configuration Setup:
numWorkers := 3defines the fixed pool size – only 3 workers will ever be activenumJobs := 15specifies how many jobs to process (more jobs than workers to demonstrate queuing)- These values control the concurrency level and workload
- Channel Creation:
jobs := make(chan int, numJobs)creates a buffered channel with capacity for all jobs- Buffering prevents job submission from blocking when workers are busy
results := make(chan string, numJobs)creates a buffered result channel- Both channels are buffered to handle the full workload without blocking
- Worker Pool Initialization:
var wg sync.WaitGrouptracks when all workers complete- The loop launches exactly
numWorkersgoroutines - Each worker gets a unique ID (1, 2, 3) for tracking and debugging
- All workers share the same
jobsandresultschannels
- Job Submission Goroutine:
- Runs in background to send jobs asynchronously
defer close(jobs)ensures the job channel closes after all jobs are sent- The loop sends jobs one by one with a 100ms delay between submissions
- This simulates real-world job generation (e.g., incoming HTTP requests)
- Cleanup Goroutine:
- Waits for all workers to finish using
wg.Wait() - Closes the results channel when all workers are done
- Prevents deadlocks and signals that no more results are coming
- Waits for all workers to finish using
- Result Collection:
- The main goroutine consumes results as they become available
- Results may arrive in any order (not necessarily job submission order)
- The loop exits when the results channel closes
Worker Implementation
func workerPool(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
for job := range jobs {
// Simulate work processing
processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecond
fmt.Printf("Worker %d processing job %d (will take %v)\n", id, job, processingTime)
time.Sleep(processingTime)
result := fmt.Sprintf("Job %d completed by worker %d in %v", job, id, processingTime)
results <- result
}
fmt.Printf("Worker %d finished\n", id)
}
Worker function breakdown:
- Function Signature:
id int: Unique worker identifier (1, 2, 3) for tracking and debuggingjobs <-chan int: Read-only channel for receiving jobsresults chan<- string: Write-only channel for sending resultswg *sync.WaitGroup: For signaling when this worker completes
- Resource Management:
defer wg.Done()ensures the worker signals completion even if it panics- Automatic cleanup when the function exits
- Prevents goroutine leaks and ensures proper shutdown
- Worker Lifecycle:
- Prints startup message to track worker initialization
- Enters the main processing loop
- Prints completion message when the job channel closes
- Job Processing Loop:
for job := range jobscontinuously processes jobs until the channel closes- Each worker competes for jobs from the shared channel (automatic load balancing)
- When the jobs channel closes, all workers naturally exit the loop
- Simulated Work Processing:
processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecondcreates variable processing time- Random delay between 200-500ms simulates real-world processing variance
- Makes concurrency visible – workers process at different speeds
- Demonstrates how the pool handles varying job complexity
- Result Generation and Sending:
- Creates a descriptive result string with job ID, worker ID, and processing time
results <- resultsends the result to the shared result channel- Non-blocking because the result channel is buffered
- Results can be sent in any order (not necessarily job submission order)
How It Works
- Pool Initialization: Creates a fixed number of worker goroutines that wait for jobs
- Job Distribution: Jobs are sent to the shared job channel
- Worker Processing: Workers compete for jobs from the channel, process them, and send results
- Result Collection: Results are collected from the result channel as they complete
- Graceful Shutdown: When the job channel is closed, workers finish processing remaining jobs and exit
The pattern ensures that only a fixed number of workers are active at any time, providing controlled concurrency and resource management.
Why This Implementation?
Fixed Worker Count
- Resource Control: Limits the number of concurrent operations
- Predictable Performance: Consistent resource usage regardless of job load
- Stability: Prevents resource exhaustion under high load
Buffered Job Channel
- Burst Handling: Can queue jobs when workers are busy
- Non-blocking Job Submission: Job senders don’t block when workers are busy
- Backpressure: Natural backpressure when queue is full
Shared Job Channel
- Automatic Load Balancing: Workers naturally consume jobs as they become available
- Fair Distribution: All workers have equal access to jobs
- Simple Coordination: No complex job distribution logic needed
WaitGroup Synchronization
- Proper Cleanup: Ensures all workers complete before closing result channel
- Resource Management: Prevents goroutine leaks
- Coordinated Shutdown: All workers finish before main function exits
Separate Result Channel
- Asynchronous Results: Results can be collected independently of job submission
- Order Independence: Results can be processed in completion order, not submission order
- Non-blocking Collection: Result collection doesn’t block job processing
Key Design Decisions
- Fixed Worker Count: The pool size is determined at creation time and remains constant
- Buffered Job Channel: Allows queuing of jobs when workers are busy
- Unbuffered Result Channel: Provides natural synchronization for result collection
- Simulated Processing Time: Random delays simulate real work and demonstrate concurrency
- Graceful Shutdown: Proper channel closing and WaitGroup coordination
Performance Characteristics
Throughput
- Limited by Worker Count: Maximum throughput is limited by the number of workers
- Consistent Performance: Predictable performance regardless of job load
- Optimal for I/O-bound Work: Workers can handle I/O operations efficiently
Latency
- Queue Time: Jobs may wait in the queue if all workers are busy
- Processing Time: Individual job processing time depends on the work being done
- Fair Scheduling: Jobs are processed in FIFO order within the queue
Resource Usage
- Memory: Fixed memory usage regardless of job load
- CPU: Controlled CPU usage through fixed worker count
- Network/File Handles: Limited resource usage through worker constraints
Common Use Cases
Web Servers
- Request Processing: Handle HTTP requests with a fixed pool of workers
- Database Operations: Process database queries with controlled concurrency
- File Operations: Handle file uploads/downloads with limited workers
Background Job Processing
- Email Sending: Process email sending with rate limiting
- Image Processing: Resize, compress, or transform images
- Report Generation: Generate reports with controlled resource usage
API Rate Limiting
- External API Calls: Make API calls with controlled concurrency
- Data Synchronization: Sync data with external systems
- Webhook Processing: Process incoming webhooks with rate limiting
Resource-Intensive Operations
- Machine Learning: Run ML models with limited GPU/CPU usage
- Data Processing: Process large datasets with controlled memory usage
- Encryption/Decryption: Handle cryptographic operations with limited CPU usage
System Administration
- Backup Operations: Run backups with controlled I/O usage
- Log Processing: Process logs with limited file handle usage
- Monitoring: Collect metrics with controlled network usage
Microservices
- Service Communication: Handle inter-service communication with rate limiting
- Event Processing: Process events with controlled concurrency
- Cache Management: Manage cache operations with limited memory usage
The worker pools pattern is particularly effective when you need to control resource usage, handle bursty workloads, or provide predictable performance in concurrent systems. It’s especially useful for I/O-bound operations where you want to limit the number of concurrent operations to prevent resource exhaustion.
The final example implementation looks like this:
package examples
import (
"fmt"
"math/rand"
"sync"
"time"
)
// Pools demonstrates the worker pools pattern
func RunPools() {
fmt.Println("=== Worker Pools Pattern Example ===")
// Configuration
numWorkers := 3
numJobs := 15
// Create job channel
jobs := make(chan int, numJobs)
// Create result channel
results := make(chan string, numJobs)
// Start the worker pool
var wg sync.WaitGroup
// Launch workers
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go workerPool(i, jobs, results, &wg)
}
// Send jobs to the pool
go func() {
defer close(jobs)
for i := 1; i <= numJobs; i++ {
fmt.Printf("Sending job %d to pool\n", i)
jobs <- i
time.Sleep(100 * time.Millisecond) // Simulate job generation time
}
}()
// Close results channel when all workers are done
go func() {
wg.Wait()
close(results)
}()
// Collect results
fmt.Printf("\nWorker pool with %d workers processing %d jobs:\n", numWorkers, numJobs)
fmt.Println()
count := 0
for result := range results {
fmt.Printf("Result: %s\n", result)
count++
}
fmt.Printf("\nWorker pool completed! Processed %d jobs.\n", count)
}
// Worker function for the pool
func workerPool(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
defer wg.Done()
fmt.Printf("Worker %d started\n", id)
for job := range jobs {
// Simulate work processing
processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecond
fmt.Printf("Worker %d processing job %d (will take %v)\n", id, job, processingTime)
time.Sleep(processingTime)
result := fmt.Sprintf("Job %d completed by worker %d in %v", job, id, processingTime)
results <- result
}
fmt.Printf("Worker %d finished\n", id)
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the producer consumer pattern.
One thought on “Go Concurrency Patterns(Worker Pools Pattern)”
Comments are closed.