Overview
The Singleflight pattern ensures that only one execution of a function is in-flight for a given key at a time. Duplicate callers wait for the result instead of executing the function again. This pattern is essential for preventing duplicate expensive operations, caching with concurrent access, API call deduplication, and database query optimization.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The singleflight implementation in examples/singleflight.go consists of three main components:
- Singleflight Group – Manages in-flight calls and their results
- Call Tracking – Tracks active calls for each key
- Result Sharing – Shares results among duplicate callers
Code Analysis
Let’s break down the main function and understand how each component works:
func RunSingleflight() {
// Create a singleflight group
sf := newSingleflight()
// Simulate multiple concurrent requests for the same key
key := "user:123"
numRequests := 5
var wg sync.WaitGroup
results := make([]string, numRequests)
fmt.Printf("Making %d concurrent requests for key: %s\n", numRequests, key)
// Launch concurrent requests
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Request %d: Starting...\n", id)
result := sf.Do(key, func() (interface{}, error) {
// Simulate expensive operation (e.g., database query, API call)
fmt.Printf("Request %d: Executing expensive operation...\n", id)
time.Sleep(2 * time.Second)
return fmt.Sprintf("Data for %s (processed by request %d)", key, id), nil
})
results[id] = result.(string)
fmt.Printf("Request %d: Completed with result: %s\n", id, result)
}(i)
}
wg.Wait()
// Show that all results are the same (same execution)
fmt.Println("\nAll results should be identical:")
for i, result := range results {
fmt.Printf(" Request %d: %s\n", i, result)
}
}
Step-by-step breakdown:
- Singleflight Group Creation:
sf := newSingleflight()creates a new singleflight group instance- The group manages all in-flight calls and their results
- This is the central coordinator that prevents duplicate executions
- Request Configuration:
key := "user:123"defines the key that all requests will usenumRequests := 5specifies how many concurrent requests to simulate- Using the same key ensures that the singleflight pattern will be triggered
- Result Storage Setup:
var wg sync.WaitGrouptracks when all request goroutines completeresults := make([]string, numRequests)creates a slice to store results- This allows us to verify that all requests get the same result
- Concurrent Request Launch:
for i := 0; i < numRequests; i++launches 5 concurrent requests- Each request gets a unique ID (0-4) for tracking and debugging
- Uses closure
func(id int) { ... }(i)to capture the request ID
- Request Goroutine Implementation:
defer wg.Done()ensures the request signals completion when it exitsfmt.Printf("Request %d: Starting...\n", id)shows when each request starts- This helps demonstrate the concurrent nature of the requests
- Singleflight Call Execution:
result := sf.Do(key, func() (interface{}, error) { ... })calls the singleflight group- The function parameter defines the expensive operation to deduplicate
- Only one execution of this function will occur, regardless of how many requests call it
- Expensive Operation Simulation:
fmt.Printf("Request %d: Executing expensive operation...\n", id)shows which request actually executestime.Sleep(2 * time.Second)simulates a 2-second expensive operation- Only the first request will see this message – others will wait for the result
- Result Processing:
results[id] = result.(string)stores the result for this requestfmt.Printf("Request %d: Completed with result: %s\n", id, result)shows completion- All requests should complete with the same result
- Verification and Display:
wg.Wait()waits for all requests to complete- The final loop displays all results to verify they’re identical
- This demonstrates that only one execution occurred, but all requests got the result
Singleflight Implementation
type singleflight struct {
mu sync.Mutex
calls map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
}
func newSingleflight() *singleflight {
return &singleflight{
calls: make(map[string]*call),
}
}
func (sf *singleflight) Do(key string, fn func() (interface{}, error)) interface{} {
sf.mu.Lock()
if c, exists := sf.calls[key]; exists {
// Another call is in progress for this key
c.dups++
sf.mu.Unlock()
fmt.Printf("Duplicate call for key %s, waiting for result...\n", key)
c.wg.Wait()
return c.val
}
// Create new call
c := &call{}
c.wg.Add(1)
sf.calls[key] = c
sf.mu.Unlock()
// Execute the function
c.val, c.err = fn()
c.wg.Done()
// Clean up
sf.mu.Lock()
delete(sf.calls, key)
sf.mu.Unlock()
return c.val
}
Singleflight implementation breakdown:
- Data Structure Design:
singleflight structcontains the main singleflight groupmu sync.Mutexprotects concurrent access to the calls mapcalls map[string]*calltracks active calls by key- Simple structure with minimal memory footprint
- Call Structure Design:
call structrepresents an in-flight function callwg sync.WaitGroupcoordinates between executing and waiting requestsval interface{}stores the result of the function executionerr errorstores any error from the function executiondups inttracks the number of duplicate requests (for monitoring)
- Constructor Function:
newSingleflight()creates a new singleflight groupcalls: make(map[string]*call)initializes the empty calls map- Returns a pointer to the singleflight group for method calls
- Do Method – Initial Lock and Duplicate Check:
sf.mu.Lock()acquires the mutex to protect the calls mapif c, exists := sf.calls[key]; existschecks if a call is already in progress- If a call exists, this is a duplicate request
- Duplicate Request Handling:
c.dups++increments the duplicate counter for monitoringsf.mu.Unlock()releases the mutex immediately (no need to hold it while waiting)fmt.Printf("Duplicate call for key %s, waiting for result...\n", key)shows duplicate detectionc.wg.Wait()blocks until the original call completesreturn c.valreturns the shared result to the duplicate request
- New Call Creation:
c := &call{}creates a new call structure for this keyc.wg.Add(1)increments the WaitGroup (will be decremented when execution completes)sf.calls[key] = cstores the call in the map to track itsf.mu.Unlock()releases the mutex before executing the function
- Function Execution:
c.val, c.err = fn()executes the provided function- This is the actual expensive operation that we want to deduplicate
- Only one request will reach this point for each key
- Result Sharing and Cleanup:
c.wg.Done()signals that execution is complete- This unblocks all waiting duplicate requests
sf.mu.Lock()acquires the mutex for cleanupdelete(sf.calls, key)removes the call from the map to free memorysf.mu.Unlock()releases the mutexreturn c.valreturns the result to the original request
Key Design Patterns:
- Mutex Protection:
sync.Mutexensures thread-safe access to the shared calls map during concurrent operations. - WaitGroup Coordination:
sync.WaitGroupcoordinates between the executing request and all waiting duplicate requests. - Map-based Call Tracking: Uses a map to efficiently track active calls by key with O(1) lookup time.
- Function-based Interface: Generic interface
func() (interface{}, error)supports any expensive operation. - Automatic Cleanup: Call entries are automatically removed from the map after completion to prevent memory leaks.
- Duplicate Monitoring: Tracks the number of duplicate calls for observability and debugging.
- Closure Pattern:
func(id int) { ... }(i)captures loop variables in request goroutines for proper ID tracking.
How It Works
- Request Arrival: Multiple requests arrive for the same key
- First Request: The first request creates a call entry and executes the function
- Duplicate Detection: Subsequent requests detect an existing call for the key
- Result Waiting: Duplicate requests wait for the first request to complete
- Result Sharing: All requests receive the same result from the single execution
- Cleanup: The call entry is removed after completion
The pattern ensures that expensive operations are executed only once, even under high concurrency.
Why This Implementation?
Mutex-based Synchronization
- Thread Safety: Protects the calls map during concurrent access
- Simple Implementation: Straightforward synchronization for the call tracking
- Performance: Minimal overhead for the typical singleflight use case
- Reliability: Ensures data consistency during concurrent operations
WaitGroup for Result Coordination
- Synchronization: WaitGroup coordinates between the executing request and waiting requests
- Non-blocking: Executing request can complete without blocking
- Clean Coordination: All requests are properly synchronized
- Resource Efficiency: Minimal overhead for coordination
Map-based Call Tracking
- Key-based Lookup: Efficient lookup of active calls by key
- Memory Management: Calls are removed after completion to prevent memory leaks
- Scalability: Can handle multiple different keys simultaneously
- Flexibility: Easy to extend for additional metadata
Function-based Interface
- Generic Design: Can work with any function that returns (interface{}, error)
- Type Safety: Caller specifies the function to execute
- Flexibility: Supports different types of expensive operations
- Error Handling: Proper error propagation from the executed function
Duplicate Counting
- Monitoring: Tracks the number of duplicate calls for observability
- Debugging: Helps identify when singleflight is being used effectively
- Metrics: Can be used for performance monitoring
- Optimization: Helps identify opportunities for caching
Key Design Decisions
- Map-based Storage: Uses a map to track active calls by key
- WaitGroup Coordination: Uses WaitGroup to synchronize result sharing
- Mutex Protection: Protects the calls map during concurrent access
- Automatic Cleanup: Removes call entries after completion
- Generic Interface: Supports any function with (interface{}, error) signature
Performance Characteristics
Throughput
- Deduplication: Eliminates duplicate expensive operations
- Resource Efficiency: Reduces resource usage for duplicate requests
- Scalability: Scales well with multiple concurrent users
- Memory Usage: Minimal memory overhead for call tracking
Latency
- First Request: Latency is the same as the original operation
- Duplicate Requests: Latency is reduced to just coordination overhead
- Coordination Overhead: Minimal overhead for result sharing
- Predictable: Latency is predictable and consistent
Memory Usage
- Call Tracking: Memory usage proportional to number of active calls
- Automatic Cleanup: Memory is freed when calls complete
- Efficient Storage: Minimal memory overhead per call
- Scalability: Memory usage doesn’t grow with duplicate requests
Common Use Cases
Caching Systems
- Cache Miss Handling: Prevent multiple cache misses for the same key
- Cache Population: Ensure only one request populates the cache
- Cache Warming: Prevent duplicate cache warming operations
- Distributed Caching: Coordinate cache operations across multiple nodes
Database Operations
- Query Deduplication: Prevent duplicate database queries
- Connection Pooling: Ensure only one connection is established per key
- Transaction Management: Prevent duplicate transaction operations
- Data Loading: Prevent duplicate data loading operations
API Calls
- External API Calls: Prevent duplicate calls to external APIs
- Rate Limiting: Ensure rate limits are respected across concurrent requests
- Authentication: Prevent duplicate authentication requests
- Data Fetching: Prevent duplicate data fetching operations
File Operations
- File Reading: Prevent duplicate file read operations
- File Processing: Ensure only one process handles a file
- Configuration Loading: Prevent duplicate configuration loading
- Resource Loading: Prevent duplicate resource loading
Background Jobs
- Job Deduplication: Prevent duplicate background job execution
- Scheduled Tasks: Ensure scheduled tasks run only once
- Data Processing: Prevent duplicate data processing operations
- Report Generation: Prevent duplicate report generation
System Operations
- Service Discovery: Prevent duplicate service discovery requests
- Health Checks: Prevent duplicate health check operations
- Configuration Updates: Prevent duplicate configuration updates
- Maintenance Operations: Prevent duplicate maintenance operations
User Interface
- Data Loading: Prevent duplicate data loading in UI components
- Form Submission: Prevent duplicate form submissions
- Search Operations: Prevent duplicate search requests
- Navigation: Prevent duplicate navigation operations
Advanced Patterns
Time-based Expiration
- Call Expiration: Expire calls after a certain time
- Stale Results: Handle cases where results become stale
- Refresh Logic: Automatically refresh expired results
- TTL Management: Manage time-to-live for cached results
Error Handling
- Error Propagation: Properly propagate errors to all callers
- Retry Logic: Implement retry logic for failed operations
- Circuit Breaker: Integrate with circuit breaker patterns
- Fallback Mechanisms: Provide fallback mechanisms for failures
Distributed Singleflight
- Multi-node Coordination: Coordinate across multiple nodes
- Shared State: Use Redis or similar for shared state
- Consistency: Ensure consistent behavior across nodes
- Network Overhead: Handle additional network communication
Metrics and Monitoring
- Call Tracking: Track the number of calls and duplicates
- Performance Metrics: Monitor performance impact
- Hit Rates: Track cache hit rates and effectiveness
- Alerting: Alert on unusual patterns or failures
Best Practices
Key Design
- Appropriate Keys: Use keys that properly identify the operation
- Key Uniqueness: Ensure keys are unique for different operations
- Key Stability: Use stable keys that don’t change frequently
- Key Size: Keep keys reasonably sized for performance
Function Design
- Idempotent Functions: Ensure functions are idempotent
- Error Handling: Properly handle and propagate errors
- Resource Management: Ensure proper resource cleanup
- Timeout Handling: Implement appropriate timeouts
Memory Management
- Call Cleanup: Ensure calls are properly cleaned up
- Memory Monitoring: Monitor memory usage of call tracking
- Leak Prevention: Prevent memory leaks from abandoned calls
- Size Limits: Implement limits on the number of active calls
Performance Optimization
- Key Hashing: Use efficient key hashing for large key sets
- Call Pooling: Reuse call objects to reduce allocation overhead
- Lazy Cleanup: Implement lazy cleanup to reduce lock contention
- Metrics Collection: Collect metrics for performance tuning
The singleflight pattern is particularly effective when you have:
- Expensive Operations: Operations that are costly to execute
- High Concurrency: Multiple concurrent requests for the same data
- Cache Misses: Frequent cache misses for the same keys
- External Dependencies: Operations that depend on external services
- Resource Constraints: Limited resources that need to be conserved
This pattern provides essential tools for building efficient, scalable systems that can handle high concurrency while minimizing resource usage and improving performance.
The final example implementation looks like this:
package examples
import (
"fmt"
"sync"
"time"
)
// RunSingleflight demonstrates the singleflight (spaceflight) pattern.
func RunSingleflight() {
fmt.Println("=== Singleflight (Spaceflight) Pattern Example ===")
// Create a singleflight group
sf := newSingleflight()
// Simulate multiple concurrent requests for the same key
key := "user:123"
numRequests := 5
var wg sync.WaitGroup
results := make([]string, numRequests)
fmt.Printf("Making %d concurrent requests for key: %s\n", numRequests, key)
// Launch concurrent requests
for i := 0; i < numRequests; i++ {
wg.Add(1)
go func(id int) {
defer wg.Done()
fmt.Printf("Request %d: Starting...\n", id)
result := sf.Do(key, func() (interface{}, error) {
// Simulate expensive operation (e.g., database query, API call)
fmt.Printf("Request %d: Executing expensive operation...\n", id)
time.Sleep(2 * time.Second)
return fmt.Sprintf("Data for %s (processed by request %d)", key, id), nil
})
results[id] = result.(string)
fmt.Printf("Request %d: Completed with result: %s\n", id, result)
}(i)
}
wg.Wait()
// Show that all results are the same (same execution)
fmt.Println("\nAll results should be identical:")
for i, result := range results {
fmt.Printf(" Request %d: %s\n", i, result)
}
// Test with different keys
fmt.Println("\nTesting with different keys:")
keys := []string{"user:123", "user:456", "user:123"}
for i, key := range keys {
wg.Add(1)
go func(id int, k string) {
defer wg.Done()
result := sf.Do(k, func() (interface{}, error) {
fmt.Printf("Request %d: Executing for key %s...\n", id, k)
time.Sleep(1 * time.Second)
return fmt.Sprintf("Data for %s", k), nil
})
fmt.Printf("Request %d: Key %s -> %s\n", id, k, result)
}(i, key)
}
wg.Wait()
fmt.Println("\nSingleflight example completed!")
}
// Singleflight ensures only one execution per key
type singleflight struct {
mu sync.Mutex
calls map[string]*call
}
type call struct {
wg sync.WaitGroup
val interface{}
err error
dups int
}
func newSingleflight() *singleflight {
return &singleflight{
calls: make(map[string]*call),
}
}
func (sf *singleflight) Do(key string, fn func() (interface{}, error)) interface{} {
sf.mu.Lock()
if c, exists := sf.calls[key]; exists {
// Another call is in progress for this key
c.dups++
sf.mu.Unlock()
fmt.Printf("Duplicate call for key %s, waiting for result...\n", key)
c.wg.Wait()
return c.val
}
// Create new call
c := &call{}
c.wg.Add(1)
sf.calls[key] = c
sf.mu.Unlock()
// Execute the function
c.val, c.err = fn()
c.wg.Done()
// Clean up
sf.mu.Lock()
delete(sf.calls, key)
sf.mu.Unlock()
return c.val
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the resource pooling pattern.
2 thoughts on “Go Concurrency Patterns(Singleflight Pattern)”
Comments are closed.