Go Concurrency Patterns(Event Loop Pattern)

Overview

The Event Loop pattern processes events from multiple sources in a single thread using a central event loop. This pattern is essential for handling multiple event sources efficiently, managing I/O operations, building reactive systems, and coordinating multiple concurrent operations in a controlled manner.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The event loop implementation in examples/event_loop.go consists of three main components:

  1. Event Loop – Central coordinator that processes events from multiple channels
  2. Event Sources – Multiple goroutines that generate events
  3. Event Processing – Centralized event handling logic

Code Analysis

Let’s break down the main function and understand how each component works:

func RunEventLoop() {
    // Create event channels
    userEvents := make(chan string, 10)
    systemEvents := make(chan string, 10)
    timerEvents := make(chan string, 10)
    stop := make(chan struct{})

    // Start event sources
    go userEventSource(userEvents)
    go systemEventSource(systemEvents)
    go timerEventSource(timerEvents)

    // Start the event loop
    go eventLoop(userEvents, systemEvents, timerEvents, stop)

    // Let the event loop run for a while
    time.Sleep(5 * time.Second)
    close(stop)
    fmt.Println("Event loop stopped.")
}

Step-by-step breakdown:

  1. Event Channel Creation:
    • userEvents := make(chan string, 10) creates a buffered channel for user events
    • systemEvents := make(chan string, 10) creates a buffered channel for system events
    • timerEvents := make(chan string, 10) creates a buffered channel for timer events
    • stop := make(chan struct{}) creates a signal channel for graceful shutdown
    • Buffer size of 10 prevents blocking when event sources generate events faster than processing
  2. Event Source Launch:
    • go userEventSource(userEvents) launches the user event generator in background
    • go systemEventSource(systemEvents) launches the system event generator in background
    • go timerEventSource(timerEvents) launches the timer event generator in background
    • Each event source runs independently and continuously generates events
  3. Event Loop Launch:
    • go eventLoop(userEvents, systemEvents, timerEvents, stop) starts the central event loop
    • The event loop runs in its own goroutine to process events from all sources
    • This is the core coordinator that handles all events sequentially
  4. Demonstration Period:
    • time.Sleep(5 * time.Second) lets the event loop run for 5 seconds
    • This provides enough time to see the pattern in action
    • During this time, all event sources are generating events and the loop is processing them
  5. Graceful Shutdown:
    • close(stop) signals the event loop to stop processing
    • The event loop will receive this signal and exit gracefully
    • fmt.Println("Event loop stopped.") confirms the shutdown

Event Loop Implementation

func eventLoop(userEvents, systemEvents, timerEvents <-chan string, stop <-chan struct{}) {
    fmt.Println("Event loop started. Processing events...")
    
    for {
        select {
        case event := <-userEvents:
            fmt.Printf("Event Loop: Processing user event: %s\n", event)
            processUserEvent(event)
            
        case event := <-systemEvents:
            fmt.Printf("Event Loop: Processing system event: %s\n", event)
            processSystemEvent(event)
            
        case event := <-timerEvents:
            fmt.Printf("Event Loop: Processing timer event: %s\n", event)
            processTimerEvent(event)
            
        case <-stop:
            fmt.Println("Event Loop: Received stop signal, shutting down...")
            return
        }
    }
}

Event loop breakdown:

  1. Function Signature:
    • Takes read-only channels (<-chan string) for each event type
    • Takes a read-only stop channel (<-chan struct{}) for shutdown
    • Directional channels provide encapsulation and prevent accidental misuse
  2. Infinite Loop Structure:
    • for { ... } creates an infinite loop that continuously processes events
    • The loop will run until explicitly stopped by the stop signal
    • This ensures the event loop is always ready to handle events
  3. Select Statement:
    • select statement waits for events from any of the four channels
    • Non-blocking – if no events are available, it waits efficiently
    • Fair selection – all event sources have equal priority
    • Only one case executes at a time, ensuring sequential processing
  4. User Event Handling:
    • case event := <-userEvents: handles user events (clicks, keypresses, etc.)
    • fmt.Printf("Event Loop: Processing user event: %s\n", event) logs the event
    • processUserEvent(event) calls the appropriate handler function
    • User events typically need quick response for good UX
  5. System Event Handling:
    • case event := <-systemEvents: handles system events (file changes, network status, etc.)
    • fmt.Printf("Event Loop: Processing system event: %s\n", event) logs the event
    • processSystemEvent(event) calls the system event handler
    • System events may require more processing time
  6. Timer Event Handling:
    • case event := <-timerEvents: handles timer events (periodic ticks)
    • fmt.Printf("Event Loop: Processing timer event: %s\n", event) logs the event
    • processTimerEvent(event) calls the timer event handler
    • Timer events are typically lightweight and frequent
  7. Stop Signal Handling:
    • case <-stop: handles the shutdown signal
    • fmt.Println("Event Loop: Received stop signal, shutting down...") logs shutdown
    • return exits the event loop gracefully
    • This ensures clean termination without data loss

Event Source Implementations

func userEventSource(events chan<- string) {
    eventTypes := []string{"click", "keypress", "scroll", "hover", "submit"}
    for {
        event := eventTypes[rand.Intn(len(eventTypes))]
        events <- fmt.Sprintf("User %s", event)
        time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
    }
}

func systemEventSource(events chan<- string) {
    eventTypes := []string{"file_change", "network_status", "memory_usage", "cpu_load", "disk_space"}
    for {
        event := eventTypes[rand.Intn(len(eventTypes))]
        events <- fmt.Sprintf("System %s", event)
        time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond)
    }
}

func timerEventSource(events chan<- string) {
    ticker := time.NewTicker(800 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            events <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000"))
        }
    }
}

Event source breakdown:

  1. User Event Source:
    • eventTypes := []string{...} defines realistic user interaction events
    • for { ... } creates an infinite loop for continuous event generation
    • event := eventTypes[rand.Intn(len(eventTypes))] randomly selects an event type
    • events <- fmt.Sprintf("User %s", event) sends the formatted event to the channel
    • time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond) simulates variable user interaction timing (500-1500ms)
  2. System Event Source:
    • eventTypes := []string{...} defines realistic system monitoring events
    • Similar structure to user events but with system-specific event types
    • time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond) simulates slower system event generation (1000-2500ms)
    • System events typically occur less frequently than user events
  3. Timer Event Source:
    • ticker := time.NewTicker(800 * time.Millisecond) creates a ticker for regular events
    • defer ticker.Stop() ensures the ticker is cleaned up when the function exits
    • for { select { case <-ticker.C: ... } } waits for ticker events
    • events <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000")) sends timestamped timer events
    • Timer events occur at regular 800ms intervals, providing predictable timing

Event Processing Functions

func processUserEvent(event string) {
    // Simulate processing time
    time.Sleep(50 * time.Millisecond)
    fmt.Printf("  Processed user event: %s\n", event)
}

func processSystemEvent(event string) {
    // Simulate processing time
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("  Processed system event: %s\n", event)
}

func processTimerEvent(event string) {
    // Simulate processing time
    time.Sleep(30 * time.Millisecond)
    fmt.Printf("  Processed timer event: %s\n", event)
}

Event processing breakdown:

  1. User Event Processing:
    • time.Sleep(50 * time.Millisecond) simulates quick user event processing
    • User events need fast response for good user experience
    • fmt.Printf(" Processed user event: %s\n", event) confirms processing completion
  2. System Event Processing:
    • time.Sleep(100 * time.Millisecond) simulates longer system event processing
    • System events may require more complex processing (file I/O, network calls, etc.)
    • fmt.Printf(" Processed system event: %s\n", event) confirms processing completion
  3. Timer Event Processing:
    • time.Sleep(30 * time.Millisecond) simulates lightweight timer event processing
    • Timer events are typically simple and fast to process
    • fmt.Printf(" Processed timer event: %s\n", event) confirms processing completion

Key Design Patterns:

  1. Single-threaded Event Processing: The event loop processes events sequentially in a single goroutine, preventing race conditions.
  2. Channel-based Communication: All event sources communicate with the event loop through channels, providing natural synchronization.
  3. Select Statement: The select statement provides non-blocking, fair event selection from multiple sources.
  4. Buffered Channels: Buffer size of 10 prevents event sources from blocking when the event loop is busy processing.
  5. Graceful Shutdown: The stop channel provides clean shutdown without data loss or resource leaks.
  6. Structured Events: Each event type has a clear structure and meaning, making the system easy to understand and extend.
  7. Simulated Processing: Random delays simulate real-world processing times and make the pattern’s behavior visible.

How It Works

  1. Event Sources: Multiple goroutines generate events and send them to channels
  2. Event Loop: A single goroutine runs the event loop, listening to all event channels
  3. Event Selection: The select statement waits for events from any source
  4. Event Processing: When an event arrives, it’s processed by the appropriate handler
  5. Non-blocking: The loop continues to process events from other sources while one is being processed
  6. Graceful Shutdown: The loop stops when it receives a stop signal

The pattern ensures that all events are processed in a controlled, sequential manner while maintaining responsiveness to multiple event sources.

Why This Implementation?

Single-threaded Event Processing

  • Sequential Processing: Events are processed one at a time, preventing race conditions
  • Predictable Behavior: Event processing order is deterministic
  • Resource Efficiency: No need for complex synchronization between event handlers
  • Debugging: Easier to debug and reason about event processing

Channel-based Event Sources

  • Asynchronous Generation: Event sources can generate events independently
  • Non-blocking: Event sources don’t block when the event loop is busy
  • Buffering: Buffered channels prevent event loss during processing delays
  • Scalability: Easy to add or remove event sources

Select Statement

  • Non-blocking Selection: Waits for events from any source without blocking
  • Fair Selection: All event sources have equal priority
  • Efficient Waiting: Efficiently waits for multiple channels
  • Clean Code: Provides clear, readable event handling logic

Centralized Event Handling

  • Single Point of Control: All events flow through one place
  • Consistent Processing: All events are processed with the same logic
  • Easy Monitoring: Easy to monitor and log all events
  • State Management: Centralized state management for event processing

Graceful Shutdown

  • Controlled Termination: Event loop can be stopped gracefully
  • Resource Cleanup: Proper cleanup of resources when stopping
  • Coordinated Shutdown: All event sources can be coordinated during shutdown
  • No Data Loss: Ensures no events are lost during shutdown

Key Design Decisions

  1. Multiple Event Types: Demonstrates handling different types of events
  2. Simulated Processing: Random delays simulate real event processing time
  3. Buffered Channels: Prevent blocking of event sources
  4. Structured Events: Events have clear structure and meaning
  5. Proper Cleanup: Graceful shutdown with proper resource cleanup

Performance Characteristics

Throughput

  • Sequential Processing: Throughput limited by processing time of individual events
  • Event Queuing: Events can queue in channels during processing delays
  • Fair Processing: All event sources are processed fairly
  • Predictable Performance: Performance is predictable and consistent

Latency

  • Processing Time: Latency depends on event processing time
  • Queue Time: Events may wait in channels if event loop is busy
  • Fair Scheduling: All event sources experience similar latency
  • Responsiveness: System remains responsive to all event sources

Memory Usage

  • Channel Buffering: Memory usage depends on channel buffer sizes
  • Event Queuing: Queued events consume memory
  • Efficient Storage: Minimal overhead for event storage
  • Scalability: Memory usage scales with number of event sources

Common Use Cases

User Interface Systems

  • GUI Applications: Handle mouse, keyboard, and window events
  • Web Applications: Process user interactions and DOM events
  • Game Engines: Handle input events and game state updates
  • Mobile Apps: Process touch, gesture, and sensor events

Network Applications

  • Web Servers: Handle HTTP requests and responses
  • Chat Applications: Process messages and connection events
  • API Gateways: Handle API requests and routing
  • Load Balancers: Process connection and health check events

System Monitoring

  • Process Monitoring: Handle process lifecycle events
  • Resource Monitoring: Process CPU, memory, and disk events
  • Network Monitoring: Handle network status and traffic events
  • Log Processing: Process log file and system log events

IoT Applications

  • Sensor Data: Process data from multiple sensors
  • Device Control: Handle device status and control events
  • Protocol Processing: Process various IoT protocol events
  • Gateway Management: Handle device registration and communication events

Financial Systems

  • Trading Systems: Process market data and order events
  • Risk Management: Handle risk calculation and alert events
  • Settlement Systems: Process transaction and settlement events
  • Compliance Monitoring: Handle regulatory and compliance events

Real-time Systems

  • Streaming Applications: Process media stream events
  • Real-time Analytics: Handle analytics and reporting events
  • Collaboration Tools: Process user collaboration events
  • Live Broadcasting: Handle broadcast and viewer events

Embedded Systems

  • Hardware Control: Process hardware interrupt events
  • Sensor Networks: Handle sensor data and network events
  • Control Systems: Process control and feedback events
  • Automation Systems: Handle automation and scheduling events

Advanced Patterns

Priority-based Event Processing

  • Event Priority: Process events based on priority levels
  • Priority Queues: Use priority queues for event ordering
  • Urgent Events: Handle urgent events with higher priority
  • Fair Scheduling: Ensure fair processing across priority levels

Event Filtering and Routing

  • Event Filtering: Filter events based on criteria
  • Event Routing: Route events to appropriate handlers
  • Event Transformation: Transform events before processing
  • Event Aggregation: Aggregate multiple events into single events

Event Persistence

  • Event Logging: Log all events for audit and debugging
  • Event Replay: Replay events for testing and recovery
  • Event Storage: Store events for later processing
  • Event Archiving: Archive old events for compliance

Distributed Event Loops

  • Multi-node Coordination: Coordinate event loops across multiple nodes
  • Event Distribution: Distribute events across multiple event loops
  • Load Balancing: Balance event processing load across nodes
  • Fault Tolerance: Handle failures in distributed event processing

Best Practices

Event Design

  • Structured Events: Use structured event types with clear meaning
  • Event Size: Keep events reasonably sized for performance
  • Event Immutability: Make events immutable to prevent side effects
  • Event Validation: Validate events before processing

Performance Optimization

  • Event Batching: Batch similar events for efficient processing
  • Async Processing: Use async processing for long-running operations
  • Event Pooling: Reuse event objects to reduce allocation overhead
  • Memory Management: Monitor and manage memory usage

Error Handling

  • Event Error Handling: Handle errors in event processing gracefully
  • Error Recovery: Implement recovery mechanisms for failed events
  • Error Reporting: Report errors for monitoring and debugging
  • Circuit Breaker: Implement circuit breakers for external dependencies

Monitoring and Observability

  • Event Metrics: Track event processing metrics
  • Performance Monitoring: Monitor event loop performance
  • Event Tracing: Trace events through the system
  • Alerting: Alert on unusual event patterns or failures

The event loop pattern is particularly effective when you have:

  • Multiple Event Sources: Multiple sources generating events
  • Sequential Processing: Need for sequential, controlled event processing
  • Resource Constraints: Limited resources that need efficient management
  • Real-time Requirements: Need for responsive event handling
  • State Management: Complex state that needs centralized management

This pattern provides a robust foundation for building responsive, efficient systems that can handle multiple event sources while maintaining control and predictability.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"time"
)

// RunEventLoop demonstrates the event loop pattern.
func RunEventLoop() {
	fmt.Println("=== Event Loop Pattern Example ===")

	// Create event channels
	userEvents := make(chan string, 10)
	systemEvents := make(chan string, 10)
	timerEvents := make(chan string, 10)
	shutdown := make(chan struct{})

	// Start event producers
	go userEventProducer(userEvents)
	go systemEventProducer(systemEvents)
	go timerEventProducer(timerEvents)

	// Start the event loop
	go eventLoop(userEvents, systemEvents, timerEvents, shutdown)

	// Let the system run for a while
	time.Sleep(5 * time.Second)

	// Shutdown
	fmt.Println("Shutting down event loop...")
	close(shutdown)

	// Wait a bit for cleanup
	time.Sleep(500 * time.Millisecond)
	fmt.Println("Event loop example completed!")
}

// Event loop that processes events from multiple sources
func eventLoop(userEvents, systemEvents, timerEvents <-chan string, shutdown <-chan struct{}) {
	fmt.Println("Event loop started...")

	for {
		select {
		case event := <-userEvents:
			fmt.Printf("Event Loop: Processing user event: %s\n", event)
			processUserEvent(event)

		case event := <-systemEvents:
			fmt.Printf("Event Loop: Processing system event: %s\n", event)
			processSystemEvent(event)

		case event := <-timerEvents:
			fmt.Printf("Event Loop: Processing timer event: %s\n", event)
			processTimerEvent(event)

		case <-shutdown:
			fmt.Println("Event Loop: Shutdown signal received, cleaning up...")
			return
		}
	}
}

// Event producers
func userEventProducer(events chan<- string) {
	userActions := []string{"login", "logout", "click", "scroll", "submit"}
	for i := 0; i < 8; i++ {
		time.Sleep(time.Duration(rand.Intn(800)+200) * time.Millisecond)
		action := userActions[rand.Intn(len(userActions))]
		events <- fmt.Sprintf("%s (user_%d)", action, i+1)
	}
}

func systemEventProducer(events chan<- string) {
	systemEvents := []string{"backup", "update", "maintenance", "alert", "sync"}
	for i := 0; i < 6; i++ {
		time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
		event := systemEvents[rand.Intn(len(systemEvents))]
		events <- fmt.Sprintf("%s (system_%d)", event, i+1)
	}
}

func timerEventProducer(events chan<- string) {
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	count := 0
	for range ticker.C {
		if count >= 5 {
			break
		}
		events <- fmt.Sprintf("heartbeat (timer_%d)", count+1)
		count++
	}
}

// Event processors
func processUserEvent(event string) {
	// Simulate processing time
	time.Sleep(100 * time.Millisecond)
	fmt.Printf("  -> User event processed: %s\n", event)
}

func processSystemEvent(event string) {
	// Simulate processing time
	time.Sleep(150 * time.Millisecond)
	fmt.Printf("  -> System event processed: %s\n", event)
}

func processTimerEvent(event string) {
	// Simulate processing time
	time.Sleep(50 * time.Millisecond)
	fmt.Printf("  -> Timer event processed: %s\n", event)
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the resource pooling pattern.