Overview
The Publish-Subscribe (Pub/Sub) pattern decouples message producers (publishers) from consumers (subscribers). Publishers send messages to a topic or channel, and all subscribers to that topic receive the messages. This pattern is essential for event-driven architectures, broadcasting messages to multiple consumers, and decoupling senders and receivers.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The publish-subscribe implementation in examples/pubsub.go consists of three main components:
- Broadcaster – Manages subscriptions and message distribution
- Publishers – Send messages to the broadcaster
- Subscribers – Receive messages from the broadcaster
Code Analysis
Let’s break down the main function and understand how each component works:
func RunPubSub() {
// Create a broadcaster
b := newBroadcaster()
numSubscribers := 3
var wg sync.WaitGroup
// Start subscribers
for i := 1; i <= numSubscribers; i++ {
ch := b.subscribe()
wg.Add(1)
go func(id int, ch <-chan string) {
defer wg.Done()
for msg := range ch {
fmt.Printf("Subscriber %d received: %s\n", id, msg)
}
fmt.Printf("Subscriber %d done.\n", id)
}(i, ch)
}
// Start publisher
go func() {
for i := 1; i <= 5; i++ {
msg := fmt.Sprintf("Message %d", i)
fmt.Printf("Publisher sending: %s\n", msg)
b.publish(msg)
time.Sleep(400 * time.Millisecond)
}
b.close()
}()
wg.Wait()
}
Step-by-step breakdown:
- Broadcaster Initialization:
b := newBroadcaster()creates a new broadcaster instance- The broadcaster manages all subscriptions and message distribution
- This is the central hub that decouples publishers from subscribers
- Subscriber Configuration:
numSubscribers := 3defines how many subscribers to createvar wg sync.WaitGrouptracks when all subscribers finish processing- This ensures the main function waits for all subscribers to complete
- Subscriber Launch Loop:
- Launches
numSubscribersgoroutines (3 in this case) ch := b.subscribe()creates a new subscription channel for each subscriber- Each subscriber gets a unique ID (1, 2, 3) for tracking and debugging
- Uses closure
func(id int, ch <-chan string) { ... }(i, ch)to capture the subscriber ID and channel
- Launches
- Subscriber Goroutine Implementation:
defer wg.Done()ensures the subscriber signals completion when it exitsfor msg := range chcontinuously reads messages until the channel closes- Each subscriber processes messages independently at its own pace
- Prints completion message when the channel closes
- Publisher Goroutine Launch:
- Runs in background to send messages asynchronously
- Publishes 5 messages with 400ms delays between them
- This simulates real-world message publishing with timing
- Message Publishing Loop:
for i := 1; i <= 5; i++sends exactly 5 messagesmsg := fmt.Sprintf("Message %d", i)creates numbered messagesb.publish(msg)broadcasts the message to all subscriberstime.Sleep(400 * time.Millisecond)simulates message generation time
- Graceful Shutdown:
b.close()closes the broadcaster and all subscriber channels- This signals all subscribers to stop processing and exit
wg.Wait()waits for all subscribers to finish before the main function exits
Broadcaster Implementation
type broadcaster struct {
subscribers []chan string
closed bool
mu sync.Mutex
}
func newBroadcaster() *broadcaster {
return &broadcaster{
subscribers: make([]chan string, 0),
}
}
func (b *broadcaster) subscribe() <-chan string {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan string, 2)
b.subscribers = append(b.subscribers, ch)
return ch
}
func (b *broadcaster) publish(msg string) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
for _, ch := range b.subscribers {
ch <- msg
}
}
func (b *broadcaster) close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
for _, ch := range b.subscribers {
close(ch)
}
b.closed = true
}
Broadcaster function breakdown:
- Data Structure Design:
subscribers []chan string: Slice of channels, one per subscriberclosed bool: Flag to prevent operations after shutdownmu sync.Mutex: Protects concurrent access to subscriber list- Simple slice-based design is efficient for typical use cases
- Constructor Function:
newBroadcaster()creates a new broadcaster instance- Initializes empty subscriber slice with
make([]chan string, 0) - Returns pointer to broadcaster for method calls
- Subscription Management:
subscribe()creates a new subscription channel for each subscriberb.mu.Lock()anddefer b.mu.Unlock()ensure thread-safe subscriber list accessch := make(chan string, 2)creates a buffered channel with capacity 2- Buffering prevents blocking if subscriber is temporarily slow
b.subscribers = append(b.subscribers, ch)adds the new channel to the list- Returns
<-chan string(read-only) for encapsulation
- Message Publishing Logic:
publish(msg string)broadcasts a message to all subscribers- Thread-safe access with mutex protection
if b.closed { return }prevents publishing after shutdownfor _, ch := range b.subscribersiterates through all subscriber channelsch <- msgsends the message to each subscriber- Non-blocking due to buffered channels
- Graceful Shutdown Process:
close()coordinates shutdown of all subscribers- Thread-safe access with mutex protection
if b.closed { return }prevents double-closingfor _, ch := range b.subscribers { close(ch) }closes all subscriber channelsb.closed = truemarks the broadcaster as closed- Closing channels signals subscribers to stop processing
Key Design Patterns:
- Slice-based Subscriber Management: Simple and efficient for typical pub/sub use cases, easy to add/remove subscribers.
- Mutex Protection: Ensures thread-safe access to the subscriber list during concurrent subscribe/publish operations.
- Buffered Channels: Each subscriber gets a buffered channel (capacity 2) to prevent blocking during message distribution.
- Read-only Channel Returns:
subscribe()returns<-chan stringto prevent subscribers from accidentally closing their channels. - Immediate Message Distribution: Messages are sent to all subscribers immediately upon publishing, providing real-time delivery.
- Graceful Shutdown: Proper channel closing ensures all subscribers exit cleanly without goroutine leaks.
- Closure Pattern:
func(id int, ch <-chan string) { ... }(i, ch)captures loop variables in each subscriber goroutine.
How It Works
- Broadcaster Creation: A broadcaster is created to manage subscriptions and message distribution
- Subscriber Registration: Subscribers call
subscribe()to get their own message channel - Message Publishing: Publishers call
publish()to send messages to all subscribers - Message Distribution: The broadcaster sends each message to all subscriber channels
- Graceful Shutdown: When the broadcaster is closed, all subscriber channels are closed
The pattern enables loose coupling between publishers and subscribers, allowing for flexible message distribution.
Why This Implementation?
Individual Subscriber Channels
- Isolation: Each subscriber has its own channel, preventing blocking
- Independent Processing: Subscribers can process messages at their own pace
- Scalability: Easy to add or remove subscribers without affecting others
Mutex-based Synchronization
- Thread Safety: Protects subscriber list during concurrent operations
- Simple Implementation: Straightforward synchronization for the subscriber list
- Performance: Minimal overhead for the typical pub/sub use case
Buffered Subscriber Channels
- Non-blocking Publishing: Publishers don’t block if subscribers are slow
- Message Buffering: Can handle temporary subscriber slowdowns
- Flow Control: Natural backpressure through channel buffering
Centralized Message Distribution
- Broadcast Semantics: All subscribers receive all messages
- Simple Coordination: Single point of message distribution
- Consistent Delivery: All subscribers receive messages in the same order
Graceful Shutdown
- Clean Termination: Properly closes all subscriber channels
- Resource Cleanup: Prevents goroutine leaks
- Coordinated Exit: All subscribers exit when broadcaster closes
Key Design Decisions
- Slice-based Subscriber Management: Simple and efficient for typical use cases
- Read-only Channel Returns: Subscribers receive read-only channels for safety
- Buffered Channels: Prevents blocking during message distribution
- Mutex Protection: Ensures thread-safe subscriber management
- Immediate Message Distribution: Messages are sent to all subscribers immediately
Performance Characteristics
Throughput
- Linear Scaling: Throughput scales linearly with the number of subscribers
- Channel Overhead: Each subscriber adds minimal overhead
- Publishing Performance: Publishing is O(n) where n is the number of subscribers
Latency
- Immediate Distribution: Messages are distributed immediately upon publishing
- Subscriber Processing: Latency depends on individual subscriber processing speed
- Channel Buffering: Buffered channels can absorb temporary processing delays
Memory Usage
- Per-Subscriber Overhead: Each subscriber requires a channel and goroutine
- Message Buffering: Buffered channels use additional memory
- Scalability: Memory usage grows linearly with subscriber count
Common Use Cases
Event-Driven Systems
- User Interface Events: Mouse clicks, keyboard input, window events
- System Events: File changes, network events, timer events
- Application Events: User actions, state changes, error conditions
Message Broadcasting
- Chat Applications: Broadcast messages to all connected users
- Notification Systems: Send notifications to multiple recipients
- Status Updates: Broadcast system status to all monitoring clients
Data Distribution
- Real-time Data: Distribute sensor data, market data, or metrics
- Configuration Changes: Broadcast configuration updates to all services
- Cache Invalidation: Notify all cache instances of data changes
Microservices Communication
- Service Discovery: Notify services of new service instances
- Health Updates: Broadcast health status changes
- Load Balancing: Distribute load information to all instances
Monitoring and Logging
- Metric Collection: Distribute metrics to multiple monitoring systems
- Log Aggregation: Send logs to multiple destinations
- Alert Distribution: Broadcast alerts to multiple notification channels
Gaming and Real-time Applications
- Game State Updates: Broadcast game state changes to all players
- Multiplayer Events: Distribute player actions to all participants
- Real-time Collaboration: Share changes across multiple users
IoT and Sensor Networks
- Sensor Data: Distribute sensor readings to multiple processors
- Device Status: Broadcast device status changes
- Control Commands: Distribute control commands to multiple devices
Financial Systems
- Market Data: Distribute real-time market data to multiple traders
- Trade Updates: Broadcast trade execution to all relevant systems
- Risk Alerts: Distribute risk alerts to multiple risk managers
Advanced Patterns
Topic-based Pub/Sub
- Multiple Topics: Support for different message categories
- Selective Subscription: Subscribers can choose which topics to receive
- Hierarchical Topics: Support for topic hierarchies (e.g., “sports.football.nfl”)
Filtered Pub/Sub
- Message Filtering: Subscribers can filter messages based on content
- Conditional Delivery: Only deliver messages that match subscriber criteria
- Performance Optimization: Reduce unnecessary message processing
Persistent Pub/Sub
- Message Persistence: Store messages for offline subscribers
- Reliable Delivery: Ensure messages are delivered even if subscribers are temporarily unavailable
- Message Ordering: Maintain message order across subscriber reconnections
Distributed Pub/Sub
- Multi-node Distribution: Distribute messages across multiple nodes
- Load Balancing: Balance message distribution across multiple brokers
- Fault Tolerance: Continue operation even if some nodes fail
The publish-subscribe pattern is particularly effective when you have:
- Loose Coupling: Components that should be independent of each other
- Multiple Consumers: Multiple components that need the same information
- Event-driven Architecture: Systems that respond to events rather than direct calls
- Scalable Communication: Need to add/remove consumers without affecting publishers
- Broadcast Requirements: Need to send the same message to multiple recipients
This pattern provides a flexible foundation for building decoupled, event-driven systems that can scale to handle multiple publishers and subscribers efficiently.
The final example implementation looks like this:
package examples
import (
"fmt"
"sync"
"time"
)
// RunPubSub demonstrates the publish-subscribe (pub/sub) pattern.
func RunPubSub() {
fmt.Println("=== Publish-Subscribe (Pub/Sub) Pattern Example ===")
// Create a broadcaster
b := newBroadcaster()
numSubscribers := 3
var wg sync.WaitGroup
// Start subscribers
for i := 1; i <= numSubscribers; i++ {
ch := b.subscribe()
wg.Add(1)
go func(id int, ch <-chan string) {
defer wg.Done()
for msg := range ch {
fmt.Printf("Subscriber %d received: %s\n", id, msg)
}
fmt.Printf("Subscriber %d done.\n", id)
}(i, ch)
}
// Start publisher
go func() {
for i := 1; i <= 5; i++ {
msg := fmt.Sprintf("Message %d", i)
fmt.Printf("Publisher sending: %s\n", msg)
b.publish(msg)
time.Sleep(400 * time.Millisecond)
}
b.close()
}()
wg.Wait()
fmt.Println("Pub/Sub example completed!")
}
// broadcaster manages subscriptions and publishing
// Not thread-safe for subscribe after close
type broadcaster struct {
subscribers []chan string
closed bool
mu sync.Mutex
}
func newBroadcaster() *broadcaster {
return &broadcaster{
subscribers: make([]chan string, 0),
}
}
func (b *broadcaster) subscribe() <-chan string {
b.mu.Lock()
defer b.mu.Unlock()
ch := make(chan string, 2)
b.subscribers = append(b.subscribers, ch)
return ch
}
func (b *broadcaster) publish(msg string) {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
for _, ch := range b.subscribers {
ch <- msg
}
}
func (b *broadcaster) close() {
b.mu.Lock()
defer b.mu.Unlock()
if b.closed {
return
}
for _, ch := range b.subscribers {
close(ch)
}
b.closed = true
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the timeouts and cancellations pattern.
One thought on “Go Concurrency Patterns(Publish-Subscribe (Pub/Sub) Pattern)”
Comments are closed.