Site icon Adron's Composite Code

Go Concurrency Patterns(Publish-Subscribe (Pub/Sub) Pattern)

Overview

The Publish-Subscribe (Pub/Sub) pattern decouples message producers (publishers) from consumers (subscribers). Publishers send messages to a topic or channel, and all subscribers to that topic receive the messages. This pattern is essential for event-driven architectures, broadcasting messages to multiple consumers, and decoupling senders and receivers.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The publish-subscribe implementation in examples/pubsub.go consists of three main components:

  1. Broadcaster – Manages subscriptions and message distribution
  2. Publishers – Send messages to the broadcaster
  3. Subscribers – Receive messages from the broadcaster

Code Analysis

Let’s break down the main function and understand how each component works:

func RunPubSub() {
    // Create a broadcaster
    b := newBroadcaster()

    numSubscribers := 3
    var wg sync.WaitGroup

    // Start subscribers
    for i := 1; i <= numSubscribers; i++ {
        ch := b.subscribe()
        wg.Add(1)
        go func(id int, ch <-chan string) {
            defer wg.Done()
            for msg := range ch {
                fmt.Printf("Subscriber %d received: %s\n", id, msg)
            }
            fmt.Printf("Subscriber %d done.\n", id)
        }(i, ch)
    }

    // Start publisher
    go func() {
        for i := 1; i <= 5; i++ {
            msg := fmt.Sprintf("Message %d", i)
            fmt.Printf("Publisher sending: %s\n", msg)
            b.publish(msg)
            time.Sleep(400 * time.Millisecond)
        }
        b.close()
    }()

    wg.Wait()
}

Step-by-step breakdown:

  1. Broadcaster Initialization:
    • b := newBroadcaster() creates a new broadcaster instance
    • The broadcaster manages all subscriptions and message distribution
    • This is the central hub that decouples publishers from subscribers
  2. Subscriber Configuration:
    • numSubscribers := 3 defines how many subscribers to create
    • var wg sync.WaitGroup tracks when all subscribers finish processing
    • This ensures the main function waits for all subscribers to complete
  3. Subscriber Launch Loop:
    • Launches numSubscribers goroutines (3 in this case)
    • ch := b.subscribe() creates a new subscription channel for each subscriber
    • Each subscriber gets a unique ID (1, 2, 3) for tracking and debugging
    • Uses closure func(id int, ch <-chan string) { ... }(i, ch) to capture the subscriber ID and channel
  4. Subscriber Goroutine Implementation:
    • defer wg.Done() ensures the subscriber signals completion when it exits
    • for msg := range ch continuously reads messages until the channel closes
    • Each subscriber processes messages independently at its own pace
    • Prints completion message when the channel closes
  5. Publisher Goroutine Launch:
    • Runs in background to send messages asynchronously
    • Publishes 5 messages with 400ms delays between them
    • This simulates real-world message publishing with timing
  6. Message Publishing Loop:
    • for i := 1; i <= 5; i++ sends exactly 5 messages
    • msg := fmt.Sprintf("Message %d", i) creates numbered messages
    • b.publish(msg) broadcasts the message to all subscribers
    • time.Sleep(400 * time.Millisecond) simulates message generation time
  7. Graceful Shutdown:
    • b.close() closes the broadcaster and all subscriber channels
    • This signals all subscribers to stop processing and exit
    • wg.Wait() waits for all subscribers to finish before the main function exits

Broadcaster Implementation

type broadcaster struct {
    subscribers []chan string
    closed     bool
    mu         sync.Mutex
}

func newBroadcaster() *broadcaster {
    return &broadcaster{
        subscribers: make([]chan string, 0),
    }
}

func (b *broadcaster) subscribe() <-chan string {
    b.mu.Lock()
    defer b.mu.Unlock()
    ch := make(chan string, 2)
    b.subscribers = append(b.subscribers, ch)
    return ch
}

func (b *broadcaster) publish(msg string) {
    b.mu.Lock()
    defer b.mu.Unlock()
    if b.closed {
        return
    }
    for _, ch := range b.subscribers {
        ch <- msg
    }
}

func (b *broadcaster) close() {
    b.mu.Lock()
    defer b.mu.Unlock()
    if b.closed {
        return
    }
    for _, ch := range b.subscribers {
        close(ch)
    }
    b.closed = true
}

Broadcaster function breakdown:

  1. Data Structure Design:
    • subscribers []chan string: Slice of channels, one per subscriber
    • closed bool: Flag to prevent operations after shutdown
    • mu sync.Mutex: Protects concurrent access to subscriber list
    • Simple slice-based design is efficient for typical use cases
  2. Constructor Function:
    • newBroadcaster() creates a new broadcaster instance
    • Initializes empty subscriber slice with make([]chan string, 0)
    • Returns pointer to broadcaster for method calls
  3. Subscription Management:
    • subscribe() creates a new subscription channel for each subscriber
    • b.mu.Lock() and defer b.mu.Unlock() ensure thread-safe subscriber list access
    • ch := make(chan string, 2) creates a buffered channel with capacity 2
    • Buffering prevents blocking if subscriber is temporarily slow
    • b.subscribers = append(b.subscribers, ch) adds the new channel to the list
    • Returns <-chan string (read-only) for encapsulation
  4. Message Publishing Logic:
    • publish(msg string) broadcasts a message to all subscribers
    • Thread-safe access with mutex protection
    • if b.closed { return } prevents publishing after shutdown
    • for _, ch := range b.subscribers iterates through all subscriber channels
    • ch <- msg sends the message to each subscriber
    • Non-blocking due to buffered channels
  5. Graceful Shutdown Process:
    • close() coordinates shutdown of all subscribers
    • Thread-safe access with mutex protection
    • if b.closed { return } prevents double-closing
    • for _, ch := range b.subscribers { close(ch) } closes all subscriber channels
    • b.closed = true marks the broadcaster as closed
    • Closing channels signals subscribers to stop processing

Key Design Patterns:

  1. Slice-based Subscriber Management: Simple and efficient for typical pub/sub use cases, easy to add/remove subscribers.
  2. Mutex Protection: Ensures thread-safe access to the subscriber list during concurrent subscribe/publish operations.
  3. Buffered Channels: Each subscriber gets a buffered channel (capacity 2) to prevent blocking during message distribution.
  4. Read-only Channel Returnssubscribe() returns <-chan string to prevent subscribers from accidentally closing their channels.
  5. Immediate Message Distribution: Messages are sent to all subscribers immediately upon publishing, providing real-time delivery.
  6. Graceful Shutdown: Proper channel closing ensures all subscribers exit cleanly without goroutine leaks.
  7. Closure Patternfunc(id int, ch <-chan string) { ... }(i, ch) captures loop variables in each subscriber goroutine.

How It Works

  1. Broadcaster Creation: A broadcaster is created to manage subscriptions and message distribution
  2. Subscriber Registration: Subscribers call subscribe() to get their own message channel
  3. Message Publishing: Publishers call publish() to send messages to all subscribers
  4. Message Distribution: The broadcaster sends each message to all subscriber channels
  5. Graceful Shutdown: When the broadcaster is closed, all subscriber channels are closed

The pattern enables loose coupling between publishers and subscribers, allowing for flexible message distribution.

Why This Implementation?

Individual Subscriber Channels

Mutex-based Synchronization

Buffered Subscriber Channels

Centralized Message Distribution

Graceful Shutdown

Key Design Decisions

  1. Slice-based Subscriber Management: Simple and efficient for typical use cases
  2. Read-only Channel Returns: Subscribers receive read-only channels for safety
  3. Buffered Channels: Prevents blocking during message distribution
  4. Mutex Protection: Ensures thread-safe subscriber management
  5. Immediate Message Distribution: Messages are sent to all subscribers immediately

Performance Characteristics

Throughput

Latency

Memory Usage

Common Use Cases

Event-Driven Systems

Message Broadcasting

Data Distribution

Microservices Communication

Monitoring and Logging

Gaming and Real-time Applications

IoT and Sensor Networks

Financial Systems

Advanced Patterns

Topic-based Pub/Sub

Filtered Pub/Sub

Persistent Pub/Sub

Distributed Pub/Sub

The publish-subscribe pattern is particularly effective when you have:

This pattern provides a flexible foundation for building decoupled, event-driven systems that can scale to handle multiple publishers and subscribers efficiently.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"sync"
	"time"
)

// RunPubSub demonstrates the publish-subscribe (pub/sub) pattern.
func RunPubSub() {
	fmt.Println("=== Publish-Subscribe (Pub/Sub) Pattern Example ===")

	// Create a broadcaster
	b := newBroadcaster()

	numSubscribers := 3
	var wg sync.WaitGroup

	// Start subscribers
	for i := 1; i <= numSubscribers; i++ {
		ch := b.subscribe()
		wg.Add(1)
		go func(id int, ch <-chan string) {
			defer wg.Done()
			for msg := range ch {
				fmt.Printf("Subscriber %d received: %s\n", id, msg)
			}
			fmt.Printf("Subscriber %d done.\n", id)
		}(i, ch)
	}

	// Start publisher
	go func() {
		for i := 1; i <= 5; i++ {
			msg := fmt.Sprintf("Message %d", i)
			fmt.Printf("Publisher sending: %s\n", msg)
			b.publish(msg)
			time.Sleep(400 * time.Millisecond)
		}
		b.close()
	}()

	wg.Wait()
	fmt.Println("Pub/Sub example completed!")
}

// broadcaster manages subscriptions and publishing
// Not thread-safe for subscribe after close

type broadcaster struct {
	subscribers []chan string
	closed      bool
	mu          sync.Mutex
}

func newBroadcaster() *broadcaster {
	return &broadcaster{
		subscribers: make([]chan string, 0),
	}
}

func (b *broadcaster) subscribe() <-chan string {
	b.mu.Lock()
	defer b.mu.Unlock()
	ch := make(chan string, 2)
	b.subscribers = append(b.subscribers, ch)
	return ch
}

func (b *broadcaster) publish(msg string) {
	b.mu.Lock()
	defer b.mu.Unlock()
	if b.closed {
		return
	}
	for _, ch := range b.subscribers {
		ch <- msg
	}
}

func (b *broadcaster) close() {
	b.mu.Lock()
	defer b.mu.Unlock()
	if b.closed {
		return
	}
	for _, ch := range b.subscribers {
		close(ch)
	}
	b.closed = true
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the timeouts and cancellations pattern.

Exit mobile version