Site icon Adron's Composite Code

Go Concurrency Patterns(Supervisor/Restart Pattern)

Overview

The Supervisor/Restart pattern involves a supervisor goroutine that monitors one or more worker goroutines. If a worker fails (panics or exits unexpectedly), the supervisor restarts it. This pattern is essential for building resilient and fault-tolerant systems, automatically recovering from transient errors, and ensuring critical tasks are always running.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The supervisor/restart implementation in examples/supervisor.go consists of three main components:

  1. Supervisor – A goroutine that monitors and manages workers
  2. Worker – A goroutine that performs work and may fail
  3. Coordination – Channels and synchronization for monitoring and restart

Code Analysis

Let’s break down the main function and understand how each component works:

func RunSupervisor() {
    var restarts int32
    stop := make(chan struct{})
    done := make(chan struct{})

    // Supervisor goroutine
    go func() {
        for {
            workerDone := make(chan struct{})
            go workerWithFailure(workerDone, stop)
            select {
            case <-workerDone:
                atomic.AddInt32(&restarts, 1)
                fmt.Println("Supervisor: Worker failed, restarting...")
                // Restart after a short delay
                time.Sleep(500 * time.Millisecond)
            case <-stop:
                fmt.Println("Supervisor: Stopping worker supervision.")
                close(done)
                return
            }
        }
    }()

    // Let the supervisor run for a while
    time.Sleep(4 * time.Second)
    close(stop)
    <-done

    fmt.Printf("Supervisor example completed! Worker was restarted %d times.\n", restarts-1)
}

Step-by-step breakdown:

  1. State and Channel Setup:
    • var restarts int32 creates an atomic counter to track restart attempts
    • stop := make(chan struct{}) creates a signal channel for graceful shutdown
    • done := make(chan struct{}) creates a completion channel for supervisor coordination
    • These channels use struct{} as the type for efficiency (zero memory allocation)
  2. Supervisor Goroutine Launch:
    • The supervisor runs in its own goroutine to monitor workers continuously
    • Uses an infinite for loop to continuously restart workers as needed
    • This ensures the supervisor is always ready to handle worker failures
  3. Worker Lifecycle Management:
    • workerDone := make(chan struct{}) creates a fresh channel for each worker
    • Each worker gets its own done channel to signal completion or failure
    • go workerWithFailure(workerDone, stop) launches a new worker goroutine
    • The worker receives both the done channel (to signal back) and stop channel (for shutdown)
  4. Supervisor Monitoring Loop:
    • Uses select statement to wait for either worker completion or stop signal
    • case <-workerDone: handles worker failure or completion
    • case <-stop: handles graceful shutdown request
    • This provides non-blocking monitoring of multiple events
  5. Failure Handling and Restart Logic:
    • atomic.AddInt32(&restarts, 1) safely increments the restart counter
    • Atomic operation ensures thread safety without mutex overhead
    • time.Sleep(500 * time.Millisecond) provides a restart delay
    • Delay prevents “thundering herd” – rapid restart loops that could overwhelm the system
  6. Graceful Shutdown Process:
    • close(done) signals that the supervisor has finished its work
    • return exits the supervisor goroutine
    • This ensures clean termination without goroutine leaks
  7. Main Function Coordination:
    • time.Sleep(4 * time.Second) lets the supervisor run for a demonstration period
    • close(stop) signals the supervisor to stop monitoring
    • <-done waits for the supervisor to finish its shutdown process
    • restarts-1 accounts for the initial worker launch (not a restart)

Worker Implementation

func workerWithFailure(done chan<- struct{}, stop <-chan struct{}) {
    fmt.Println("Worker: Started")
    workTime := time.Duration(rand.Intn(1200)+400) * time.Millisecond
    select {
    case <-time.After(workTime):
        // Simulate random failure
        if rand.Float32() < 0.6 {
            fmt.Println("Worker: Simulated failure!")
            done <- struct{}{}
            return
        }
        fmt.Println("Worker: Completed work successfully.")
    case <-stop:
        fmt.Println("Worker: Received stop signal.")
    }
    // Signal normal exit
    done <- struct{}{}
}

Worker function breakdown:

  1. Function Signature:
    • done chan<- struct{}: Write-only channel for signaling completion/failure to supervisor
    • stop <-chan struct{}: Read-only channel for receiving shutdown signals
    • Directional channels provide encapsulation and prevent misuse
  2. Worker Initialization:
    • Prints startup message for debugging and monitoring
    • workTime := time.Duration(rand.Intn(1200)+400) * time.Millisecond creates variable work duration
    • Random duration between 400-1600ms simulates real-world processing variability
  3. Work Execution with Timeout:
    • Uses select statement to handle both work completion and stop signals
    • case <-time.After(workTime): simulates work that takes a variable amount of time
    • case <-stop: handles graceful shutdown requests
  4. Failure Simulation Logic:
    • if rand.Float32() < 0.6 creates a 60% probability of failure
    • High failure rate (60%) demonstrates the restart mechanism effectively
    • done <- struct{}{} immediately signals failure to supervisor
    • return exits the worker without sending a second done signal
  5. Successful Completion Path:
    • If no failure occurs, worker prints success message
    • Continues to the final done <- struct{}{} to signal normal completion
    • This distinguishes between failure and successful completion
  6. Stop Signal Handling:
    • If stop signal is received, worker prints acknowledgment
    • Continues to final done <- struct{}{} to signal graceful shutdown
    • This ensures supervisor knows the worker has stopped
  7. Final Signal:
    • done <- struct{}{} signals completion for both success and graceful shutdown cases
    • Ensures supervisor always receives a signal when worker exits
    • Prevents supervisor from waiting indefinitely

Key Design Patterns:

  1. Channel Direction Safety: Using directional channels (chan<- and <-chan) prevents accidental misuse and provides clear API contracts.
  2. Atomic Counter: Using atomic.AddInt32 for restart counting provides thread safety without mutex overhead.
  3. Select Statement: The select statement in both supervisor and worker provides non-blocking event handling for multiple channels.
  4. Fresh Channel per Worker: Creating a new workerDone channel for each worker ensures clean communication and prevents signal confusion.
  5. Graceful Shutdown: Both supervisor and worker respond to stop signals, ensuring clean resource cleanup.
  6. Failure Simulation: High failure rate (60%) with variable work time demonstrates the restart mechanism effectively in a short demonstration period.

How It Works

  1. Supervisor Initialization: The supervisor starts and launches the first worker
  2. Worker Execution: The worker performs its task and may fail randomly
  3. Failure Detection: When a worker fails, it signals the supervisor via a channel
  4. Restart Logic: The supervisor waits briefly, then launches a new worker
  5. Continuous Monitoring: This cycle continues until the supervisor receives a stop signal
  6. Graceful Shutdown: The supervisor stops launching new workers and waits for current workers to finish

The pattern ensures that critical work continues even when individual workers fail.

Why This Implementation?

Channel-based Communication

Atomic Counter for Restarts

Separate Stop Channel

Worker Done Channel

Restart Delay

Key Design Decisions

  1. Simulated Failures: Random failures (60% probability) demonstrate the restart mechanism
  2. Variable Work Time: Random work duration simulates real-world variability
  3. Immediate Restart: Supervisor restarts workers immediately after failure detection
  4. Stop Signal Handling: Both supervisor and workers respond to stop signals
  5. Restart Counting: Tracks the number of restarts for monitoring purposes

Failure Handling Strategies

Immediate Restart

Exponential Backoff

Circuit Breaker

Health Checks

Common Use Cases

System Services

Microservices

Monitoring Systems

Database Operations

File Processing

Network Services

IoT Applications

Financial Systems

Best Practices

Failure Detection

Restart Strategy

Resource Management

Monitoring and Logging

The supervisor/restart pattern is particularly effective when you have:

This pattern provides a robust foundation for building resilient systems that can automatically recover from failures and maintain continuous operation.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"sync/atomic"
	"time"
)

// RunSupervisor demonstrates the supervisor/restart pattern.
func RunSupervisor() {
	fmt.Println("=== Supervisor/Restart Pattern Example ===")

	var restarts int32
	stop := make(chan struct{})
	done := make(chan struct{})

	// Supervisor goroutine
	go func() {
		for {
			workerDone := make(chan struct{})
			go workerWithFailure(workerDone, stop)
			select {
			case <-workerDone:
				atomic.AddInt32(&restarts, 1)
				fmt.Println("Supervisor: Worker failed, restarting...")
				// Restart after a short delay
				time.Sleep(500 * time.Millisecond)
			case <-stop:
				fmt.Println("Supervisor: Stopping worker supervision.")
				close(done)
				return
			}
		}
	}()

	// Let the supervisor run for a while
	time.Sleep(4 * time.Second)
	close(stop)
	<-done

	fmt.Printf("Supervisor example completed! Worker was restarted %d times.\n", restarts-1)
}

// workerWithFailure simulates a worker that randomly fails
func workerWithFailure(done chan<- struct{}, stop <-chan struct{}) {
	fmt.Println("Worker: Started")
	workTime := time.Duration(rand.Intn(1200)+400) * time.Millisecond
	select {
	case <-time.After(workTime):
		// Simulate random failure
		if rand.Float32() < 0.6 {
			fmt.Println("Worker: Simulated failure!")
			done <- struct{}{}
			return
		}
		fmt.Println("Worker: Completed work successfully.")
	case <-stop:
		fmt.Println("Worker: Received stop signal.")
	}
	// Signal normal exit
	done <- struct{}{}
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the pub/sub pattern.

Exit mobile version