Go Concurrency Patterns(Worker Pools Pattern)

Overview

The Worker Pools pattern maintains a fixed number of workers that process jobs from a shared queue. This pattern is essential for controlling resource usage, handling bursty workloads, and providing predictable performance in concurrent systems.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The worker pools implementation in examples/pools.go consists of three main components:

  1. Job Queue – A buffered channel that holds jobs to be processed
  2. Worker Pool – A fixed number of worker goroutines
  3. Result Collection – A channel for collecting processed results

Code Analysis

Let’s break down the main function and understand how each component works:

func RunPools() {
    // Configuration
    numWorkers := 3
    numJobs := 15
    
    // Create job channel
    jobs := make(chan int, numJobs)
    
    // Create result channel
    results := make(chan string, numJobs)
    
    // Start the worker pool
    var wg sync.WaitGroup
    
    // Launch workers
    for i := 1; i <= numWorkers; i++ {
        wg.Add(1)
        go workerPool(i, jobs, results, &wg)
    }
    
    // Send jobs to the pool
    go func() {
        defer close(jobs)
        for i := 1; i <= numJobs; i++ {
            fmt.Printf("Sending job %d to pool\n", i)
            jobs <- i
            time.Sleep(100 * time.Millisecond) // Simulate job generation time
        }
    }()
    
    // Close results channel when all workers are done
    go func() {
        wg.Wait()
        close(results)
    }()
    
    // Collect results
    for result := range results {
        fmt.Printf("Result: %s\n", result)
    }
}

Step-by-step breakdown:

  1. Configuration Setup:
    • numWorkers := 3 defines the fixed pool size – only 3 workers will ever be active
    • numJobs := 15 specifies how many jobs to process (more jobs than workers to demonstrate queuing)
    • These values control the concurrency level and workload
  2. Channel Creation:
    • jobs := make(chan int, numJobs) creates a buffered channel with capacity for all jobs
    • Buffering prevents job submission from blocking when workers are busy
    • results := make(chan string, numJobs) creates a buffered result channel
    • Both channels are buffered to handle the full workload without blocking
  3. Worker Pool Initialization:
    • var wg sync.WaitGroup tracks when all workers complete
    • The loop launches exactly numWorkers goroutines
    • Each worker gets a unique ID (1, 2, 3) for tracking and debugging
    • All workers share the same jobs and results channels
  4. Job Submission Goroutine:
    • Runs in background to send jobs asynchronously
    • defer close(jobs) ensures the job channel closes after all jobs are sent
    • The loop sends jobs one by one with a 100ms delay between submissions
    • This simulates real-world job generation (e.g., incoming HTTP requests)
  5. Cleanup Goroutine:
    • Waits for all workers to finish using wg.Wait()
    • Closes the results channel when all workers are done
    • Prevents deadlocks and signals that no more results are coming
  6. Result Collection:
    • The main goroutine consumes results as they become available
    • Results may arrive in any order (not necessarily job submission order)
    • The loop exits when the results channel closes

Worker Implementation

func workerPool(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    
    fmt.Printf("Worker %d started\n", id)
    
    for job := range jobs {
        // Simulate work processing
        processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecond
        fmt.Printf("Worker %d processing job %d (will take %v)\n", id, job, processingTime)
        
        time.Sleep(processingTime)
        
        result := fmt.Sprintf("Job %d completed by worker %d in %v", job, id, processingTime)
        results <- result
    }
    
    fmt.Printf("Worker %d finished\n", id)
}

Worker function breakdown:

  1. Function Signature:
    • id int: Unique worker identifier (1, 2, 3) for tracking and debugging
    • jobs <-chan int: Read-only channel for receiving jobs
    • results chan<- string: Write-only channel for sending results
    • wg *sync.WaitGroup: For signaling when this worker completes
  2. Resource Management:
    • defer wg.Done() ensures the worker signals completion even if it panics
    • Automatic cleanup when the function exits
    • Prevents goroutine leaks and ensures proper shutdown
  3. Worker Lifecycle:
    • Prints startup message to track worker initialization
    • Enters the main processing loop
    • Prints completion message when the job channel closes
  4. Job Processing Loop:
    • for job := range jobs continuously processes jobs until the channel closes
    • Each worker competes for jobs from the shared channel (automatic load balancing)
    • When the jobs channel closes, all workers naturally exit the loop
  5. Simulated Work Processing:
    • processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecond creates variable processing time
    • Random delay between 200-500ms simulates real-world processing variance
    • Makes concurrency visible – workers process at different speeds
    • Demonstrates how the pool handles varying job complexity
  6. Result Generation and Sending:
    • Creates a descriptive result string with job ID, worker ID, and processing time
    • results <- result sends the result to the shared result channel
    • Non-blocking because the result channel is buffered
    • Results can be sent in any order (not necessarily job submission order)

How It Works

  1. Pool Initialization: Creates a fixed number of worker goroutines that wait for jobs
  2. Job Distribution: Jobs are sent to the shared job channel
  3. Worker Processing: Workers compete for jobs from the channel, process them, and send results
  4. Result Collection: Results are collected from the result channel as they complete
  5. Graceful Shutdown: When the job channel is closed, workers finish processing remaining jobs and exit

The pattern ensures that only a fixed number of workers are active at any time, providing controlled concurrency and resource management.

Why This Implementation?

Fixed Worker Count

  • Resource Control: Limits the number of concurrent operations
  • Predictable Performance: Consistent resource usage regardless of job load
  • Stability: Prevents resource exhaustion under high load

Buffered Job Channel

  • Burst Handling: Can queue jobs when workers are busy
  • Non-blocking Job Submission: Job senders don’t block when workers are busy
  • Backpressure: Natural backpressure when queue is full

Shared Job Channel

  • Automatic Load Balancing: Workers naturally consume jobs as they become available
  • Fair Distribution: All workers have equal access to jobs
  • Simple Coordination: No complex job distribution logic needed

WaitGroup Synchronization

  • Proper Cleanup: Ensures all workers complete before closing result channel
  • Resource Management: Prevents goroutine leaks
  • Coordinated Shutdown: All workers finish before main function exits

Separate Result Channel

  • Asynchronous Results: Results can be collected independently of job submission
  • Order Independence: Results can be processed in completion order, not submission order
  • Non-blocking Collection: Result collection doesn’t block job processing

Key Design Decisions

  1. Fixed Worker Count: The pool size is determined at creation time and remains constant
  2. Buffered Job Channel: Allows queuing of jobs when workers are busy
  3. Unbuffered Result Channel: Provides natural synchronization for result collection
  4. Simulated Processing Time: Random delays simulate real work and demonstrate concurrency
  5. Graceful Shutdown: Proper channel closing and WaitGroup coordination

Performance Characteristics

Throughput

  • Limited by Worker Count: Maximum throughput is limited by the number of workers
  • Consistent Performance: Predictable performance regardless of job load
  • Optimal for I/O-bound Work: Workers can handle I/O operations efficiently

Latency

  • Queue Time: Jobs may wait in the queue if all workers are busy
  • Processing Time: Individual job processing time depends on the work being done
  • Fair Scheduling: Jobs are processed in FIFO order within the queue

Resource Usage

  • Memory: Fixed memory usage regardless of job load
  • CPU: Controlled CPU usage through fixed worker count
  • Network/File Handles: Limited resource usage through worker constraints

Common Use Cases

Web Servers

  • Request Processing: Handle HTTP requests with a fixed pool of workers
  • Database Operations: Process database queries with controlled concurrency
  • File Operations: Handle file uploads/downloads with limited workers

Background Job Processing

  • Email Sending: Process email sending with rate limiting
  • Image Processing: Resize, compress, or transform images
  • Report Generation: Generate reports with controlled resource usage

API Rate Limiting

  • External API Calls: Make API calls with controlled concurrency
  • Data Synchronization: Sync data with external systems
  • Webhook Processing: Process incoming webhooks with rate limiting

Resource-Intensive Operations

  • Machine Learning: Run ML models with limited GPU/CPU usage
  • Data Processing: Process large datasets with controlled memory usage
  • Encryption/Decryption: Handle cryptographic operations with limited CPU usage

System Administration

  • Backup Operations: Run backups with controlled I/O usage
  • Log Processing: Process logs with limited file handle usage
  • Monitoring: Collect metrics with controlled network usage

Microservices

  • Service Communication: Handle inter-service communication with rate limiting
  • Event Processing: Process events with controlled concurrency
  • Cache Management: Manage cache operations with limited memory usage

The worker pools pattern is particularly effective when you need to control resource usage, handle bursty workloads, or provide predictable performance in concurrent systems. It’s especially useful for I/O-bound operations where you want to limit the number of concurrent operations to prevent resource exhaustion.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Pools demonstrates the worker pools pattern
func RunPools() {
	fmt.Println("=== Worker Pools Pattern Example ===")

	// Configuration
	numWorkers := 3
	numJobs := 15

	// Create job channel
	jobs := make(chan int, numJobs)

	// Create result channel
	results := make(chan string, numJobs)

	// Start the worker pool
	var wg sync.WaitGroup

	// Launch workers
	for i := 1; i <= numWorkers; i++ {
		wg.Add(1)
		go workerPool(i, jobs, results, &wg)
	}

	// Send jobs to the pool
	go func() {
		defer close(jobs)
		for i := 1; i <= numJobs; i++ {
			fmt.Printf("Sending job %d to pool\n", i)
			jobs <- i
			time.Sleep(100 * time.Millisecond) // Simulate job generation time
		}
	}()

	// Close results channel when all workers are done
	go func() {
		wg.Wait()
		close(results)
	}()

	// Collect results
	fmt.Printf("\nWorker pool with %d workers processing %d jobs:\n", numWorkers, numJobs)
	fmt.Println()

	count := 0
	for result := range results {
		fmt.Printf("Result: %s\n", result)
		count++
	}

	fmt.Printf("\nWorker pool completed! Processed %d jobs.\n", count)
}

// Worker function for the pool
func workerPool(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
	defer wg.Done()

	fmt.Printf("Worker %d started\n", id)

	for job := range jobs {
		// Simulate work processing
		processingTime := time.Duration(rand.Intn(300)+200) * time.Millisecond
		fmt.Printf("Worker %d processing job %d (will take %v)\n", id, job, processingTime)

		time.Sleep(processingTime)

		result := fmt.Sprintf("Job %d completed by worker %d in %v", job, id, processingTime)
		results <- result
	}

	fmt.Printf("Worker %d finished\n", id)
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the producer consumer pattern.

One thought on “Go Concurrency Patterns(Worker Pools Pattern)

Comments are closed.