Go Concurrency Patterns(Producer-Consumer Pattern)

Overview

The Producer-Consumer pattern decouples the production of data from its consumption using a shared buffer (channel). Multiple producers can generate data and send it to the buffer, while multiple consumers read from the buffer and process the data. This pattern is essential for buffering bursts of work, decoupling work rates between producers and consumers, and load leveling.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The producer-consumer implementation in examples/producer_consumer.go consists of three main components:

  1. Producers – Multiple goroutines that generate data
  2. Buffer – A shared channel that holds the data
  3. Consumers – Multiple goroutines that process the data

Code Analysis

Let’s break down the main function and understand how each component works:

func RunProducerConsumer() {
    bufferSize := 5
    numProducers := 2
    numConsumers := 3
    numItems := 10

    buffer := make(chan int, bufferSize)
    var wg sync.WaitGroup

    // Start producers
    for p := 1; p <= numProducers; p++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            for i := 0; i < numItems; i++ {
                item := rand.Intn(100)
                buffer <- item
                fmt.Printf("Producer %d produced: %d\n", id, item)
                time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond)
            }
        }(p)
    }

    // Start consumers
    var consumerWg sync.WaitGroup
    for c := 1; c <= numConsumers; c++ {
        consumerWg.Add(1)
        go func(id int) {
            defer consumerWg.Done()
            for item := range buffer {
                fmt.Printf("Consumer %d consumed: %d\n", id, item)
                time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
            }
        }(c)
    }

    // Wait for all producers to finish, then close the buffer
    wg.Wait()
    close(buffer)

    // Wait for all consumers to finish
    consumerWg.Wait()
}

Step-by-step breakdown:

  1. Configuration Setup:
    • bufferSize := 5 defines the buffer capacity – can hold up to 5 items
    • numProducers := 2 launches 2 producer goroutines
    • numConsumers := 3 launches 3 consumer goroutines
    • numItems := 10 specifies how many items each producer will create
    • These values create a scenario where producers generate more data than consumers can immediately process
  2. Buffer and Synchronization Setup:
    • buffer := make(chan int, bufferSize) creates a buffered channel with capacity 5
    • Buffering allows producers to continue working even when consumers are slow
    • var wg sync.WaitGroup tracks when all producers complete
    • var consumerWg sync.WaitGroup tracks when all consumers complete
  3. Producer Launch Loop:
    • Launches numProducers goroutines (2 in this case)
    • Each producer gets a unique ID (1, 2) for tracking
    • Uses closure func(id int) { ... }(p) to capture the producer ID
    • wg.Add(1) increments the producer wait group before each goroutine
  4. Producer Goroutine Implementation:
    • defer wg.Done() ensures the producer signals completion when it exits
    • for i := 0; i < numItems; i++ loop creates exactly 10 items per producer
    • item := rand.Intn(100) generates random data (0-99) to simulate real data
    • buffer <- item sends the item to the shared buffer (may block if buffer is full)
    • time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond) simulates variable production time (100-300ms)
  5. Consumer Launch Loop:
    • Launches numConsumers goroutines (3 in this case)
    • Each consumer gets a unique ID (1, 2, 3) for tracking
    • Uses closure to capture the consumer ID
    • consumerWg.Add(1) increments the consumer wait group before each goroutine
  6. Consumer Goroutine Implementation:
    • defer consumerWg.Done() ensures the consumer signals completion when it exits
    • for item := range buffer continuously reads from the buffer until it closes
    • time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond) simulates variable processing time (100-400ms)
    • Consumers compete for items from the shared buffer (automatic load balancing)
  7. Graceful Shutdown Process:
    • wg.Wait() waits for all producers to finish generating their items
    • close(buffer) signals to consumers that no more data is coming
    • consumerWg.Wait() waits for all consumers to finish processing remaining items
    • This ensures clean termination without goroutine leaks

Key Design Patterns:

  1. Closure Patternfunc(id int) { ... }(p) captures the loop variable p in each goroutine’s closure, ensuring each goroutine gets a unique ID.
  2. Dual WaitGroup Strategy: Separate wait groups for producers and consumers allow for coordinated shutdown – producers finish first, then buffer closes, then consumers finish.
  3. Buffered Channel as Queue: The buffer decouples production and consumption rates, allowing the system to handle temporary mismatches in speed.
  4. Range Loop for Consumersfor item := range buffer automatically exits when the buffer closes, providing natural termination.
  5. Simulated Work: Random delays make concurrency visible and demonstrate how the pattern handles varying processing speeds.

How It Works

  1. Producer Initialization: Multiple producer goroutines start generating data
  2. Data Production: Producers create data and send it to the shared buffer channel
  3. Buffer Management: The buffered channel holds data until consumers are ready
  4. Consumer Processing: Multiple consumer goroutines read from the buffer and process data
  5. Graceful Shutdown: When all producers finish, the buffer is closed, signaling consumers to stop

The pattern enables asynchronous processing where producers and consumers can operate at different rates.

Why This Implementation?

Buffered Channel as Buffer

  • Decoupling: Producers and consumers can operate independently
  • Burst Handling: Buffer can absorb temporary spikes in production
  • Flow Control: Natural backpressure when buffer is full

Multiple Producers

  • Parallel Production: Multiple sources can generate data simultaneously
  • Load Distribution: Work can be distributed across multiple producers
  • Fault Tolerance: If one producer fails, others continue

Multiple Consumers

  • Parallel Processing: Multiple consumers can process data simultaneously
  • Load Balancing: Work is automatically distributed among consumers
  • Scalability: Easy to add or remove consumers based on load

Separate WaitGroups

  • Producer Coordination: Ensures all producers finish before closing buffer
  • Consumer Coordination: Ensures all consumers finish before program exits
  • Clean Shutdown: Proper coordination prevents goroutine leaks

Channel Closing Strategy

  • Signal Completion: Closing the buffer signals consumers that no more data is coming
  • Graceful Termination: Consumers naturally exit when buffer is closed
  • Resource Cleanup: Prevents consumers from waiting indefinitely

Key Design Decisions

  1. Buffered Channel Size: The buffer size (5) determines how much data can be queued
  2. Producer Count: Multiple producers (2) demonstrate parallel data generation
  3. Consumer Count: Multiple consumers (3) demonstrate parallel data processing
  4. Simulated Work: Random delays simulate real processing time and make concurrency visible
  5. Structured Shutdown: Proper coordination ensures clean program termination

Performance Characteristics

Throughput

  • Limited by Slowest Component: Overall throughput is limited by the slowest producer or consumer
  • Buffer Impact: Larger buffers can handle bigger bursts but use more memory
  • Parallel Processing: Multiple consumers can increase processing throughput

Latency

  • Buffer Time: Data may wait in the buffer if consumers are slow
  • Processing Time: Individual item processing time depends on consumer speed
  • Fair Distribution: Items are consumed in FIFO order

Resource Usage

  • Memory: Buffer size determines memory usage
  • CPU: Multiple producers and consumers can utilize multiple CPU cores
  • Coordination Overhead: Minimal overhead from channel operations

Common Use Cases

Data Processing Pipelines

  • Log Processing: Multiple log sources → Buffer → Multiple log processors
  • Image Processing: Multiple cameras → Buffer → Multiple image analyzers
  • Sensor Data: Multiple sensors → Buffer → Multiple data processors

Message Queuing Systems

  • Event Processing: Multiple event sources → Buffer → Multiple event handlers
  • Notification Systems: Multiple notification sources → Buffer → Multiple delivery agents
  • Webhook Processing: Multiple webhook sources → Buffer → Multiple webhook handlers

Batch Processing

  • File Processing: Multiple file sources → Buffer → Multiple file processors
  • Report Generation: Multiple data sources → Buffer → Multiple report generators
  • Data Import: Multiple data sources → Buffer → Multiple import processors

Real-time Systems

  • Trading Systems: Multiple market data feeds → Buffer → Multiple trading algorithms
  • IoT Applications: Multiple device sensors → Buffer → Multiple data analyzers
  • Monitoring Systems: Multiple metric sources → Buffer → Multiple alert processors

API Rate Limiting

  • Request Processing: Multiple API clients → Buffer → Rate-limited API processors
  • Data Synchronization: Multiple sync sources → Buffer → Controlled sync processors
  • External Service Calls: Multiple call sources → Buffer → Rate-limited service callers

Content Processing

  • Video Processing: Multiple video sources → Buffer → Multiple video processors
  • Document Processing: Multiple document sources → Buffer → Multiple document analyzers
  • Audio Processing: Multiple audio sources → Buffer → Multiple audio processors

Database Operations

  • Write Operations: Multiple write sources → Buffer → Database writers
  • Read Operations: Multiple read requests → Buffer → Database readers
  • Migration Jobs: Multiple migration sources → Buffer → Migration processors

The producer-consumer pattern is particularly effective when you have:

  • Variable Production Rates: Producers that generate data at unpredictable rates
  • Variable Consumption Rates: Consumers that process data at different speeds
  • Bursty Workloads: Periods of high activity followed by low activity
  • Resource Constraints: Need to limit resource usage while maintaining throughput
  • Decoupled Systems: Components that should operate independently

This pattern provides a robust foundation for building scalable, responsive systems that can handle varying loads efficiently.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// RunProducerConsumer demonstrates the producer-consumer pattern with multiple producers and consumers.
func RunProducerConsumer() {
	fmt.Println("=== Producer-Consumer Pattern Example ===")

	bufferSize := 5
	numProducers := 2
	numConsumers := 3
	numItems := 10

	buffer := make(chan int, bufferSize)
	var wg sync.WaitGroup

	// Start producers
	for p := 1; p <= numProducers; p++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			for i := 0; i < numItems; i++ {
				item := rand.Intn(100)
				buffer <- item
				fmt.Printf("Producer %d produced: %d\n", id, item)
				time.Sleep(time.Duration(rand.Intn(200)+100) * time.Millisecond)
			}
		}(p)
	}

	// Start consumers
	var consumerWg sync.WaitGroup
	for c := 1; c <= numConsumers; c++ {
		consumerWg.Add(1)
		go func(id int) {
			defer consumerWg.Done()
			for item := range buffer {
				fmt.Printf("Consumer %d consumed: %d\n", id, item)
				time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
			}
		}(c)
	}

	// Wait for all producers to finish, then close the buffer
	wg.Wait()
	close(buffer)

	// Wait for all consumers to finish
	consumerWg.Wait()

	fmt.Println("Producer-Consumer example completed!")
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the supervisor-restart pattern.

One thought on “Go Concurrency Patterns(Producer-Consumer Pattern)

Comments are closed.