Site icon Adron's Composite Code

Go Concurrency Patterns(Singleflight Pattern)

Overview

The Singleflight pattern ensures that only one execution of a function is in-flight for a given key at a time. Duplicate callers wait for the result instead of executing the function again. This pattern is essential for preventing duplicate expensive operations, caching with concurrent access, API call deduplication, and database query optimization.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The singleflight implementation in examples/singleflight.go consists of three main components:

  1. Singleflight Group – Manages in-flight calls and their results
  2. Call Tracking – Tracks active calls for each key
  3. Result Sharing – Shares results among duplicate callers

Code Analysis

Let’s break down the main function and understand how each component works:

func RunSingleflight() {
    // Create a singleflight group
    sf := newSingleflight()

    // Simulate multiple concurrent requests for the same key
    key := "user:123"
    numRequests := 5

    var wg sync.WaitGroup
    results := make([]string, numRequests)

    fmt.Printf("Making %d concurrent requests for key: %s\n", numRequests, key)

    // Launch concurrent requests
    for i := 0; i < numRequests; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Request %d: Starting...\n", id)
            
            result := sf.Do(key, func() (interface{}, error) {
                // Simulate expensive operation (e.g., database query, API call)
                fmt.Printf("Request %d: Executing expensive operation...\n", id)
                time.Sleep(2 * time.Second)
                return fmt.Sprintf("Data for %s (processed by request %d)", key, id), nil
            })
            
            results[id] = result.(string)
            fmt.Printf("Request %d: Completed with result: %s\n", id, result)
        }(i)
    }

    wg.Wait()

    // Show that all results are the same (same execution)
    fmt.Println("\nAll results should be identical:")
    for i, result := range results {
        fmt.Printf("  Request %d: %s\n", i, result)
    }
}

Step-by-step breakdown:

  1. Singleflight Group Creation:
    • sf := newSingleflight() creates a new singleflight group instance
    • The group manages all in-flight calls and their results
    • This is the central coordinator that prevents duplicate executions
  2. Request Configuration:
    • key := "user:123" defines the key that all requests will use
    • numRequests := 5 specifies how many concurrent requests to simulate
    • Using the same key ensures that the singleflight pattern will be triggered
  3. Result Storage Setup:
    • var wg sync.WaitGroup tracks when all request goroutines complete
    • results := make([]string, numRequests) creates a slice to store results
    • This allows us to verify that all requests get the same result
  4. Concurrent Request Launch:
    • for i := 0; i < numRequests; i++ launches 5 concurrent requests
    • Each request gets a unique ID (0-4) for tracking and debugging
    • Uses closure func(id int) { ... }(i) to capture the request ID
  5. Request Goroutine Implementation:
    • defer wg.Done() ensures the request signals completion when it exits
    • fmt.Printf("Request %d: Starting...\n", id) shows when each request starts
    • This helps demonstrate the concurrent nature of the requests
  6. Singleflight Call Execution:
    • result := sf.Do(key, func() (interface{}, error) { ... }) calls the singleflight group
    • The function parameter defines the expensive operation to deduplicate
    • Only one execution of this function will occur, regardless of how many requests call it
  7. Expensive Operation Simulation:
    • fmt.Printf("Request %d: Executing expensive operation...\n", id) shows which request actually executes
    • time.Sleep(2 * time.Second) simulates a 2-second expensive operation
    • Only the first request will see this message – others will wait for the result
  8. Result Processing:
    • results[id] = result.(string) stores the result for this request
    • fmt.Printf("Request %d: Completed with result: %s\n", id, result) shows completion
    • All requests should complete with the same result
  9. Verification and Display:
    • wg.Wait() waits for all requests to complete
    • The final loop displays all results to verify they’re identical
    • This demonstrates that only one execution occurred, but all requests got the result

Singleflight Implementation

type singleflight struct {
    mu    sync.Mutex
    calls map[string]*call
}

type call struct {
    wg    sync.WaitGroup
    val   interface{}
    err   error
    dups  int
}

func newSingleflight() *singleflight {
    return &singleflight{
        calls: make(map[string]*call),
    }
}

func (sf *singleflight) Do(key string, fn func() (interface{}, error)) interface{} {
    sf.mu.Lock()
    
    if c, exists := sf.calls[key]; exists {
        // Another call is in progress for this key
        c.dups++
        sf.mu.Unlock()
        fmt.Printf("Duplicate call for key %s, waiting for result...\n", key)
        c.wg.Wait()
        return c.val
    }

    // Create new call
    c := &call{}
    c.wg.Add(1)
    sf.calls[key] = c
    sf.mu.Unlock()

    // Execute the function
    c.val, c.err = fn()
    c.wg.Done()

    // Clean up
    sf.mu.Lock()
    delete(sf.calls, key)
    sf.mu.Unlock()

    return c.val
}

Singleflight implementation breakdown:

  1. Data Structure Design:
    • singleflight struct contains the main singleflight group
    • mu sync.Mutex protects concurrent access to the calls map
    • calls map[string]*call tracks active calls by key
    • Simple structure with minimal memory footprint
  2. Call Structure Design:
    • call struct represents an in-flight function call
    • wg sync.WaitGroup coordinates between executing and waiting requests
    • val interface{} stores the result of the function execution
    • err error stores any error from the function execution
    • dups int tracks the number of duplicate requests (for monitoring)
  3. Constructor Function:
    • newSingleflight() creates a new singleflight group
    • calls: make(map[string]*call) initializes the empty calls map
    • Returns a pointer to the singleflight group for method calls
  4. Do Method – Initial Lock and Duplicate Check:
    • sf.mu.Lock() acquires the mutex to protect the calls map
    • if c, exists := sf.calls[key]; exists checks if a call is already in progress
    • If a call exists, this is a duplicate request
  5. Duplicate Request Handling:
    • c.dups++ increments the duplicate counter for monitoring
    • sf.mu.Unlock() releases the mutex immediately (no need to hold it while waiting)
    • fmt.Printf("Duplicate call for key %s, waiting for result...\n", key) shows duplicate detection
    • c.wg.Wait() blocks until the original call completes
    • return c.val returns the shared result to the duplicate request
  6. New Call Creation:
    • c := &call{} creates a new call structure for this key
    • c.wg.Add(1) increments the WaitGroup (will be decremented when execution completes)
    • sf.calls[key] = c stores the call in the map to track it
    • sf.mu.Unlock() releases the mutex before executing the function
  7. Function Execution:
    • c.val, c.err = fn() executes the provided function
    • This is the actual expensive operation that we want to deduplicate
    • Only one request will reach this point for each key
  8. Result Sharing and Cleanup:
    • c.wg.Done() signals that execution is complete
    • This unblocks all waiting duplicate requests
    • sf.mu.Lock() acquires the mutex for cleanup
    • delete(sf.calls, key) removes the call from the map to free memory
    • sf.mu.Unlock() releases the mutex
    • return c.val returns the result to the original request

Key Design Patterns:

  1. Mutex Protectionsync.Mutex ensures thread-safe access to the shared calls map during concurrent operations.
  2. WaitGroup Coordinationsync.WaitGroup coordinates between the executing request and all waiting duplicate requests.
  3. Map-based Call Tracking: Uses a map to efficiently track active calls by key with O(1) lookup time.
  4. Function-based Interface: Generic interface func() (interface{}, error) supports any expensive operation.
  5. Automatic Cleanup: Call entries are automatically removed from the map after completion to prevent memory leaks.
  6. Duplicate Monitoring: Tracks the number of duplicate calls for observability and debugging.
  7. Closure Patternfunc(id int) { ... }(i) captures loop variables in request goroutines for proper ID tracking.

How It Works

  1. Request Arrival: Multiple requests arrive for the same key
  2. First Request: The first request creates a call entry and executes the function
  3. Duplicate Detection: Subsequent requests detect an existing call for the key
  4. Result Waiting: Duplicate requests wait for the first request to complete
  5. Result Sharing: All requests receive the same result from the single execution
  6. Cleanup: The call entry is removed after completion

The pattern ensures that expensive operations are executed only once, even under high concurrency.

Why This Implementation?

Mutex-based Synchronization

WaitGroup for Result Coordination

Map-based Call Tracking

Function-based Interface

Duplicate Counting

Key Design Decisions

  1. Map-based Storage: Uses a map to track active calls by key
  2. WaitGroup Coordination: Uses WaitGroup to synchronize result sharing
  3. Mutex Protection: Protects the calls map during concurrent access
  4. Automatic Cleanup: Removes call entries after completion
  5. Generic Interface: Supports any function with (interface{}, error) signature

Performance Characteristics

Throughput

Latency

Memory Usage

Common Use Cases

Caching Systems

Database Operations

API Calls

File Operations

Background Jobs

System Operations

User Interface

Advanced Patterns

Time-based Expiration

Error Handling

Distributed Singleflight

Metrics and Monitoring

Best Practices

Key Design

Function Design

Memory Management

Performance Optimization

The singleflight pattern is particularly effective when you have:

This pattern provides essential tools for building efficient, scalable systems that can handle high concurrency while minimizing resource usage and improving performance.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"sync"
	"time"
)

// RunSingleflight demonstrates the singleflight (spaceflight) pattern.
func RunSingleflight() {
	fmt.Println("=== Singleflight (Spaceflight) Pattern Example ===")

	// Create a singleflight group
	sf := newSingleflight()

	// Simulate multiple concurrent requests for the same key
	key := "user:123"
	numRequests := 5

	var wg sync.WaitGroup
	results := make([]string, numRequests)

	fmt.Printf("Making %d concurrent requests for key: %s\n", numRequests, key)

	// Launch concurrent requests
	for i := 0; i < numRequests; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			fmt.Printf("Request %d: Starting...\n", id)

			result := sf.Do(key, func() (interface{}, error) {
				// Simulate expensive operation (e.g., database query, API call)
				fmt.Printf("Request %d: Executing expensive operation...\n", id)
				time.Sleep(2 * time.Second)
				return fmt.Sprintf("Data for %s (processed by request %d)", key, id), nil
			})

			results[id] = result.(string)
			fmt.Printf("Request %d: Completed with result: %s\n", id, result)
		}(i)
	}

	wg.Wait()

	// Show that all results are the same (same execution)
	fmt.Println("\nAll results should be identical:")
	for i, result := range results {
		fmt.Printf("  Request %d: %s\n", i, result)
	}

	// Test with different keys
	fmt.Println("\nTesting with different keys:")
	keys := []string{"user:123", "user:456", "user:123"}

	for i, key := range keys {
		wg.Add(1)
		go func(id int, k string) {
			defer wg.Done()
			result := sf.Do(k, func() (interface{}, error) {
				fmt.Printf("Request %d: Executing for key %s...\n", id, k)
				time.Sleep(1 * time.Second)
				return fmt.Sprintf("Data for %s", k), nil
			})
			fmt.Printf("Request %d: Key %s -> %s\n", id, k, result)
		}(i, key)
	}

	wg.Wait()
	fmt.Println("\nSingleflight example completed!")
}

// Singleflight ensures only one execution per key
type singleflight struct {
	mu    sync.Mutex
	calls map[string]*call
}

type call struct {
	wg   sync.WaitGroup
	val  interface{}
	err  error
	dups int
}

func newSingleflight() *singleflight {
	return &singleflight{
		calls: make(map[string]*call),
	}
}

func (sf *singleflight) Do(key string, fn func() (interface{}, error)) interface{} {
	sf.mu.Lock()

	if c, exists := sf.calls[key]; exists {
		// Another call is in progress for this key
		c.dups++
		sf.mu.Unlock()
		fmt.Printf("Duplicate call for key %s, waiting for result...\n", key)
		c.wg.Wait()
		return c.val
	}

	// Create new call
	c := &call{}
	c.wg.Add(1)
	sf.calls[key] = c
	sf.mu.Unlock()

	// Execute the function
	c.val, c.err = fn()
	c.wg.Done()

	// Clean up
	sf.mu.Lock()
	delete(sf.calls, key)
	sf.mu.Unlock()

	return c.val
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the resource pooling pattern.

Exit mobile version