Site icon Adron's Composite Code

Go Concurrency Patterns(Resource Pooling Pattern)

Overview

The Resource Pooling pattern manages a pool of reusable resources (like database connections, HTTP clients, or file handles) to avoid the overhead of creating and destroying resources frequently. This pattern is essential for improving performance, managing resource limits, reducing overhead, and ensuring efficient resource utilization.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The resource pooling implementation in examples/resource_pooling.go demonstrates two main examples:

  1. Database Connection Pool – Manages database connections with health checks
  2. HTTP Client Pool – Manages HTTP clients for making requests

Code Analysis

Let’s break down the main function and understand how each component works:

func RunResourcePooling() {
    // Example 1: Database connection pool
    fmt.Println("1. Database Connection Pool Example:")
    dbPool := newDBConnectionPool(3, 5*time.Second)
    defer dbPool.Close()

    var wg sync.WaitGroup
    for i := 1; i <= 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            conn := dbPool.Get()
            defer dbPool.Put(conn)
            
            fmt.Printf("Worker %d: Using connection %s\n", id, conn.ID)
            time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
        }(i)
    }
    wg.Wait()

    // Example 2: HTTP client pool
    fmt.Println("\n2. HTTP Client Pool Example:")
    clientPool := newHTTPClientPool(2, 3*time.Second)
    defer clientPool.Close()

    for i := 1; i <= 4; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            client := clientPool.Get()
            defer clientPool.Put(client)
            
            fmt.Printf("Worker %d: Using HTTP client %s\n", id, client.ID)
            time.Sleep(time.Duration(rand.Intn(800)+300) * time.Millisecond)
        }(i)
    }
    wg.Wait()
}

Step-by-step breakdown:

  1. Database Connection Pool Setup:
    • dbPool := newDBConnectionPool(3, 5*time.Second) creates a pool with 3 connections and 5-second timeout
    • Pool size of 3 means only 3 workers can use connections simultaneously
    • 5-second timeout prevents workers from waiting indefinitely for connections
    • defer dbPool.Close() ensures proper cleanup when the function exits
  2. Database Pool Worker Launch:
    • var wg sync.WaitGroup tracks when all database workers complete
    • for i := 1; i <= 5; i++ launches 5 workers (more than pool size to demonstrate pooling)
    • Uses closure func(id int) { ... }(i) to capture the worker ID
    • wg.Add(1) increments the wait group before each worker
  3. Database Pool Worker Implementation:
    • defer wg.Done() ensures the worker signals completion when it exits
    • conn := dbPool.Get() acquires a connection from the pool (may block if none available)
    • defer dbPool.Put(conn) ensures the connection is returned to the pool
    • time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond) simulates database work (500-1500ms)
  4. Database Pool Coordination:
    • wg.Wait() waits for all database workers to complete
    • Only 3 workers can use connections simultaneously due to pool size
    • Other workers will wait for connections to become available
  5. HTTP Client Pool Setup:
    • clientPool := newHTTPClientPool(2, 3*time.Second) creates a pool with 2 clients and 3-second timeout
    • Smaller pool size (2) demonstrates different resource constraints
    • Shorter timeout (3 seconds) shows different timeout requirements
    • defer clientPool.Close() ensures proper cleanup
  6. HTTP Client Pool Worker Launch:
    • for i := 1; i <= 4; i++ launches 4 workers (more than pool size)
    • Similar structure to database workers but with different resource type
    • wg.Add(1) tracks each HTTP client worker
  7. HTTP Client Pool Worker Implementation:
    • defer wg.Done() ensures proper cleanup
    • client := clientPool.Get() acquires an HTTP client from the pool
    • defer clientPool.Put(client) ensures the client is returned to the pool
    • time.Sleep(time.Duration(rand.Intn(800)+300) * time.Millisecond) simulates HTTP work (300-1100ms)
  8. HTTP Client Pool Coordination:
    • wg.Wait() waits for all HTTP client workers to complete
    • Only 2 workers can use clients simultaneously due to pool size
    • Demonstrates how pooling handles resource constraints

Database Connection Pool Implementation

type DBConnection struct {
    ID        string
    CreatedAt time.Time
    LastUsed  time.Time
}

type DBConnectionPool struct {
    connections chan *DBConnection
    maxSize     int
    timeout     time.Duration
    closed      bool
    mu          sync.Mutex
}

func newDBConnectionPool(maxSize int, timeout time.Duration) *DBConnectionPool {
    pool := &DBConnectionPool{
        connections: make(chan *DBConnection, maxSize),
        maxSize:     maxSize,
        timeout:     timeout,
    }

    // Pre-populate the pool
    for i := 0; i < maxSize; i++ {
        conn := &DBConnection{
            ID:        fmt.Sprintf("db-conn-%d", i+1),
            CreatedAt: time.Now(),
            LastUsed:  time.Now(),
        }
        pool.connections <- conn
    }

    return pool
}

func (p *DBConnectionPool) Get() *DBConnection {
    select {
    case conn := <-p.connections:
        conn.LastUsed = time.Now()
        fmt.Printf("  Acquired connection: %s\n", conn.ID)
        return conn
    case <-time.After(p.timeout):
        fmt.Println("  Timeout waiting for connection")
        return nil
    }
}

func (p *DBConnectionPool) Put(conn *DBConnection) {
    if conn == nil {
        return
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        return
    }

    // Check if connection is still healthy
    if time.Since(conn.LastUsed) > p.timeout {
        fmt.Printf("  Discarding stale connection: %s\n", conn.ID)
        return
    }

    select {
    case p.connections <- conn:
        fmt.Printf("  Returned connection: %s\n", conn.ID)
    default:
        fmt.Printf("  Pool full, discarding connection: %s\n", conn.ID)
    }
}

func (p *DBConnectionPool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.closed = true
    close(p.connections)
}

Database connection pool breakdown:

  1. Connection Structure Design:
    • DBConnection struct represents a database connection
    • ID string provides unique identification for tracking and debugging
    • CreatedAt time.Time tracks when the connection was created
    • LastUsed time.Time tracks when the connection was last used for health checking
  2. Pool Structure Design:
    • DBConnectionPool struct manages the connection pool
    • connections chan *DBConnection uses a buffered channel for thread-safe resource management
    • maxSize int defines the maximum number of connections in the pool
    • timeout time.Duration defines the timeout for connection acquisition
    • closed bool tracks pool state for graceful shutdown
    • mu sync.Mutex protects pool state during concurrent operations
  3. Pool Constructor:
    • newDBConnectionPool(maxSize int, timeout time.Duration) creates a new pool
    • connections: make(chan *DBConnection, maxSize) creates a buffered channel with pool size
    • The channel buffer size matches the pool size for optimal performance
  4. Pool Pre-population:
    • for i := 0; i < maxSize; i++ creates the initial set of connections
    • conn := &DBConnection{...} creates each connection with unique ID and timestamps
    • pool.connections <- conn adds each connection to the pool
    • Pre-population ensures connections are immediately available
  5. Connection Acquisition (Get Method):
    • select statement handles both connection acquisition and timeout
    • case conn := <-p.connections: acquires a connection from the pool
    • conn.LastUsed = time.Now() updates the last used timestamp
    • case <-time.After(p.timeout): handles timeout when no connections are available
    • Returns nil on timeout to indicate failure
  6. Connection Return (Put Method):
    • if conn == nil { return } handles nil connections gracefully
    • p.mu.Lock() and defer p.mu.Unlock() protect pool state during return
    • if p.closed { return } prevents returning connections to a closed pool
  7. Health Checking:
    • if time.Since(conn.LastUsed) > p.timeout checks if connection is stale
    • Stale connections are discarded rather than returned to the pool
    • This ensures only healthy connections are reused
  8. Connection Return Logic:
    • select { case p.connections <- conn: ... default: ... } attempts to return connection
    • If pool is full, connection is discarded (default case)
    • This prevents pool overflow and memory leaks
  9. Pool Cleanup (Close Method):
    • p.mu.Lock() and defer p.mu.Unlock() protect pool state during shutdown
    • p.closed = true marks the pool as closed
    • close(p.connections) closes the channel to signal shutdown

HTTP Client Pool Implementation

type HTTPClient struct {
    ID        string
    CreatedAt time.Time
    LastUsed  time.Time
}

type HTTPClientPool struct {
    clients chan *HTTPClient
    maxSize int
    timeout time.Duration
    closed  bool
    mu      sync.Mutex
}

func newHTTPClientPool(maxSize int, timeout time.Duration) *HTTPClientPool {
    pool := &HTTPClientPool{
        clients: make(chan *HTTPClient, maxSize),
        maxSize: maxSize,
        timeout: timeout,
    }

    // Pre-populate the pool
    for i := 0; i < maxSize; i++ {
        client := &HTTPClient{
            ID:        fmt.Sprintf("http-client-%d", i+1),
            CreatedAt: time.Now(),
            LastUsed:  time.Now(),
        }
        pool.clients <- client
    }

    return pool
}

func (p *HTTPClientPool) Get() *HTTPClient {
    select {
    case client := <-p.clients:
        client.LastUsed = time.Now()
        fmt.Printf("  Acquired HTTP client: %s\n", client.ID)
        return client
    case <-time.After(p.timeout):
        fmt.Println("  Timeout waiting for HTTP client")
        return nil
    }
}

func (p *HTTPClientPool) Put(client *HTTPClient) {
    if client == nil {
        return
    }

    p.mu.Lock()
    defer p.mu.Unlock()

    if p.closed {
        return
    }

    // Check if client is still healthy
    if time.Since(client.LastUsed) > p.timeout {
        fmt.Printf("  Discarding stale HTTP client: %s\n", client.ID)
        return
    }

    select {
    case p.clients <- client:
        fmt.Printf("  Returned HTTP client: %s\n", client.ID)
    default:
        fmt.Printf("  Pool full, discarding HTTP client: %s\n", client.ID)
    }
}

func (p *HTTPClientPool) Close() {
    p.mu.Lock()
    defer p.mu.Unlock()
    p.closed = true
    close(p.clients)
}

HTTP client pool breakdown:

  1. Client Structure Design:
    • HTTPClient struct represents an HTTP client
    • Similar structure to database connections for consistency
    • ID string provides unique identification
    • CreatedAt and LastUsed timestamps for lifecycle tracking
  2. Pool Structure Design:
    • HTTPClientPool struct manages the HTTP client pool
    • Identical structure to database pool for consistency
    • Uses the same channel-based approach for resource management
  3. Pool Constructor and Pre-population:
    • newHTTPClientPool(maxSize int, timeout time.Duration) creates the pool
    • Pre-populates with maxSize HTTP clients
    • Each client gets a unique ID and initial timestamps
  4. Client Acquisition and Return:
    • Get() method identical to database pool with timeout handling
    • Put() method includes health checking and graceful handling
    • Same mutex protection and closed pool handling
  5. Health Checking and Cleanup:
    • Same health checking logic as database pool
    • Stale clients are discarded to maintain pool health
    • Close() method provides graceful shutdown

Key Design Patterns:

  1. Channel-based Resource Management: Uses buffered channels for thread-safe resource allocation and return.
  2. Pre-populated Pools: Creates all resources upfront for immediate availability and predictable performance.
  3. Health Checking: Validates resource health before reuse to ensure reliability.
  4. Timeout Handling: Prevents indefinite waiting with configurable timeouts.
  5. Graceful Shutdown: Proper cleanup prevents resource leaks during shutdown.
  6. Mutex Protection: Thread-safe state management for pool operations.
  7. Resource Tracking: Comprehensive tracking of resource lifecycle for monitoring and debugging.

How It Works

Database Connection Pool

  1. Pool Initialization: Creates a pool with pre-populated database connections
  2. Connection Acquisition: Workers request connections from the pool
  3. Connection Usage: Workers use connections for database operations
  4. Connection Return: Workers return connections to the pool when done
  5. Health Checking: Pool checks connection health and discards stale connections
  6. Resource Management: Pool manages connection lifecycle and cleanup

HTTP Client Pool

  1. Pool Initialization: Creates a pool with pre-populated HTTP clients
  2. Client Acquisition: Workers request HTTP clients from the pool
  3. Client Usage: Workers use clients for HTTP requests
  4. Client Return: Workers return clients to the pool when done
  5. Health Checking: Pool checks client health and discards stale clients
  6. Resource Management: Pool manages client lifecycle and cleanup

Why This Implementation?

Channel-based Pool Management

Pre-populated Pools

Health Checking

Timeout Handling

Mutex Protection

Key Design Decisions

  1. Two Different Pools: Demonstrates the pattern with different resource types
  2. Health Checking: Implements resource health validation
  3. Timeout Handling: Provides timeout mechanisms for resource acquisition
  4. Graceful Shutdown: Implements proper cleanup and shutdown
  5. Resource Tracking: Tracks resource usage and health metrics

Performance Characteristics

Throughput

Latency

Resource Usage

Common Use Cases

Database Operations

HTTP Operations

File Operations

Network Operations

Memory Management

Thread Management

Advanced Patterns

Dynamic Pool Sizing

Resource Health Monitoring

Priority-based Pooling

Distributed Pooling

Best Practices

Pool Configuration

Resource Management

Performance Optimization

Monitoring and Alerting

The resource pooling pattern is particularly effective when you have:

This pattern provides essential tools for building efficient, scalable systems that can handle high concurrency while optimizing resource usage and improving performance.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// RunResourcePooling demonstrates the resource pooling pattern.
func RunResourcePooling() {
	fmt.Println("=== Resource Pooling Pattern Example ===")

	// Example 1: Database Connection Pool
	fmt.Println("\n1. Database Connection Pool Example:")
	dbPool := newDBConnectionPool(3, 5)

	var wg sync.WaitGroup
	for i := 1; i <= 8; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			conn := dbPool.getConnection()
			fmt.Printf("Worker %d: Got DB connection %d\n", id, conn.id)

			// Simulate database operation
			time.Sleep(time.Duration(rand.Intn(500)+200) * time.Millisecond)
			fmt.Printf("Worker %d: Executing query on connection %d\n", id, conn.id)

			dbPool.releaseConnection(conn)
			fmt.Printf("Worker %d: Released DB connection %d\n", id, conn.id)
		}(i)
	}
	wg.Wait()
	dbPool.close()

	// Example 2: HTTP Client Pool
	fmt.Println("\n2. HTTP Client Pool Example:")
	clientPool := newHTTPClientPool(2, 4)

	for i := 1; i <= 6; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			client := clientPool.getClient()
			fmt.Printf("Worker %d: Got HTTP client %d\n", id, client.id)

			// Simulate API request
			time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
			fmt.Printf("Worker %d: Making API request with client %d\n", id, client.id)

			clientPool.releaseClient(client)
			fmt.Printf("Worker %d: Released HTTP client %d\n", id, client.id)
		}(i)
	}
	wg.Wait()
	clientPool.close()

	fmt.Println("\nResource Pooling example completed!")
}

// Database Connection Pool
type dbConnection struct {
	id       int
	lastUsed time.Time
}

type dbConnectionPool struct {
	connections chan *dbConnection
	maxSize     int
	created     int
	mu          sync.Mutex
}

func newDBConnectionPool(initial, maxSize int) *dbConnectionPool {
	pool := &dbConnectionPool{
		connections: make(chan *dbConnection, maxSize),
		maxSize:     maxSize,
	}

	// Pre-populate with initial connections
	for i := 0; i < initial; i++ {
		pool.connections <- &dbConnection{
			id:       i + 1,
			lastUsed: time.Now(),
		}
		pool.created++
	}

	return pool
}

func (p *dbConnectionPool) getConnection() *dbConnection {
	select {
	case conn := <-p.connections:
		conn.lastUsed = time.Now()
		return conn
	default:
		// Create new connection if pool is empty and under max size
		p.mu.Lock()
		if p.created < p.maxSize {
			p.created++
			p.mu.Unlock()
			return &dbConnection{
				id:       p.created,
				lastUsed: time.Now(),
			}
		}
		p.mu.Unlock()

		// Wait for available connection
		conn := <-p.connections
		conn.lastUsed = time.Now()
		return conn
	}
}

func (p *dbConnectionPool) releaseConnection(conn *dbConnection) {
	select {
	case p.connections <- conn:
		// Successfully returned to pool
	default:
		// Pool is full, discard connection
		fmt.Printf("Pool full, discarding connection %d\n", conn.id)
	}
}

func (p *dbConnectionPool) close() {
	close(p.connections)
	fmt.Printf("DB pool closed. Total connections created: %d\n", p.created)
}

// HTTP Client Pool
type httpClient struct {
	id       int
	lastUsed time.Time
}

type httpClientPool struct {
	clients chan *httpClient
	maxSize int
	created int
	mu      sync.Mutex
}

func newHTTPClientPool(initial, maxSize int) *httpClientPool {
	pool := &httpClientPool{
		clients: make(chan *httpClient, maxSize),
		maxSize: maxSize,
	}

	// Pre-populate with initial clients
	for i := 0; i < initial; i++ {
		pool.clients <- &httpClient{
			id:       i + 1,
			lastUsed: time.Now(),
		}
		pool.created++
	}

	return pool
}

func (p *httpClientPool) getClient() *httpClient {
	select {
	case client := <-p.clients:
		client.lastUsed = time.Now()
		return client
	default:
		// Create new client if pool is empty and under max size
		p.mu.Lock()
		if p.created < p.maxSize {
			p.created++
			p.mu.Unlock()
			return &httpClient{
				id:       p.created,
				lastUsed: time.Now(),
			}
		}
		p.mu.Unlock()

		// Wait for available client
		client := <-p.clients
		client.lastUsed = time.Now()
		return client
	}
}

func (p *httpClientPool) releaseClient(client *httpClient) {
	select {
	case p.clients <- client:
		// Successfully returned to pool
	default:
		// Pool is full, discard client
		fmt.Printf("Pool full, discarding HTTP client %d\n", client.id)
	}
}

func (p *httpClientPool) close() {
	close(p.clients)
	fmt.Printf("HTTP client pool closed. Total clients created: %d\n", p.created)
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic. A new series will begin in a day or few! 🤘🏻 Happy thrashing code!

Exit mobile version