Site icon Adron's Composite Code

Go Concurrency Patterns(Event Loop Pattern)

Overview

The Event Loop pattern processes events from multiple sources in a single thread using a central event loop. This pattern is essential for handling multiple event sources efficiently, managing I/O operations, building reactive systems, and coordinating multiple concurrent operations in a controlled manner.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The event loop implementation in examples/event_loop.go consists of three main components:

  1. Event Loop – Central coordinator that processes events from multiple channels
  2. Event Sources – Multiple goroutines that generate events
  3. Event Processing – Centralized event handling logic

Code Analysis

Let’s break down the main function and understand how each component works:

func RunEventLoop() {
    // Create event channels
    userEvents := make(chan string, 10)
    systemEvents := make(chan string, 10)
    timerEvents := make(chan string, 10)
    stop := make(chan struct{})

    // Start event sources
    go userEventSource(userEvents)
    go systemEventSource(systemEvents)
    go timerEventSource(timerEvents)

    // Start the event loop
    go eventLoop(userEvents, systemEvents, timerEvents, stop)

    // Let the event loop run for a while
    time.Sleep(5 * time.Second)
    close(stop)
    fmt.Println("Event loop stopped.")
}

Step-by-step breakdown:

  1. Event Channel Creation:
    • userEvents := make(chan string, 10) creates a buffered channel for user events
    • systemEvents := make(chan string, 10) creates a buffered channel for system events
    • timerEvents := make(chan string, 10) creates a buffered channel for timer events
    • stop := make(chan struct{}) creates a signal channel for graceful shutdown
    • Buffer size of 10 prevents blocking when event sources generate events faster than processing
  2. Event Source Launch:
    • go userEventSource(userEvents) launches the user event generator in background
    • go systemEventSource(systemEvents) launches the system event generator in background
    • go timerEventSource(timerEvents) launches the timer event generator in background
    • Each event source runs independently and continuously generates events
  3. Event Loop Launch:
    • go eventLoop(userEvents, systemEvents, timerEvents, stop) starts the central event loop
    • The event loop runs in its own goroutine to process events from all sources
    • This is the core coordinator that handles all events sequentially
  4. Demonstration Period:
    • time.Sleep(5 * time.Second) lets the event loop run for 5 seconds
    • This provides enough time to see the pattern in action
    • During this time, all event sources are generating events and the loop is processing them
  5. Graceful Shutdown:
    • close(stop) signals the event loop to stop processing
    • The event loop will receive this signal and exit gracefully
    • fmt.Println("Event loop stopped.") confirms the shutdown

Event Loop Implementation

func eventLoop(userEvents, systemEvents, timerEvents <-chan string, stop <-chan struct{}) {
    fmt.Println("Event loop started. Processing events...")
    
    for {
        select {
        case event := <-userEvents:
            fmt.Printf("Event Loop: Processing user event: %s\n", event)
            processUserEvent(event)
            
        case event := <-systemEvents:
            fmt.Printf("Event Loop: Processing system event: %s\n", event)
            processSystemEvent(event)
            
        case event := <-timerEvents:
            fmt.Printf("Event Loop: Processing timer event: %s\n", event)
            processTimerEvent(event)
            
        case <-stop:
            fmt.Println("Event Loop: Received stop signal, shutting down...")
            return
        }
    }
}

Event loop breakdown:

  1. Function Signature:
    • Takes read-only channels (<-chan string) for each event type
    • Takes a read-only stop channel (<-chan struct{}) for shutdown
    • Directional channels provide encapsulation and prevent accidental misuse
  2. Infinite Loop Structure:
    • for { ... } creates an infinite loop that continuously processes events
    • The loop will run until explicitly stopped by the stop signal
    • This ensures the event loop is always ready to handle events
  3. Select Statement:
    • select statement waits for events from any of the four channels
    • Non-blocking – if no events are available, it waits efficiently
    • Fair selection – all event sources have equal priority
    • Only one case executes at a time, ensuring sequential processing
  4. User Event Handling:
    • case event := <-userEvents: handles user events (clicks, keypresses, etc.)
    • fmt.Printf("Event Loop: Processing user event: %s\n", event) logs the event
    • processUserEvent(event) calls the appropriate handler function
    • User events typically need quick response for good UX
  5. System Event Handling:
    • case event := <-systemEvents: handles system events (file changes, network status, etc.)
    • fmt.Printf("Event Loop: Processing system event: %s\n", event) logs the event
    • processSystemEvent(event) calls the system event handler
    • System events may require more processing time
  6. Timer Event Handling:
    • case event := <-timerEvents: handles timer events (periodic ticks)
    • fmt.Printf("Event Loop: Processing timer event: %s\n", event) logs the event
    • processTimerEvent(event) calls the timer event handler
    • Timer events are typically lightweight and frequent
  7. Stop Signal Handling:
    • case <-stop: handles the shutdown signal
    • fmt.Println("Event Loop: Received stop signal, shutting down...") logs shutdown
    • return exits the event loop gracefully
    • This ensures clean termination without data loss

Event Source Implementations

func userEventSource(events chan<- string) {
    eventTypes := []string{"click", "keypress", "scroll", "hover", "submit"}
    for {
        event := eventTypes[rand.Intn(len(eventTypes))]
        events <- fmt.Sprintf("User %s", event)
        time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
    }
}

func systemEventSource(events chan<- string) {
    eventTypes := []string{"file_change", "network_status", "memory_usage", "cpu_load", "disk_space"}
    for {
        event := eventTypes[rand.Intn(len(eventTypes))]
        events <- fmt.Sprintf("System %s", event)
        time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond)
    }
}

func timerEventSource(events chan<- string) {
    ticker := time.NewTicker(800 * time.Millisecond)
    defer ticker.Stop()
    
    for {
        select {
        case <-ticker.C:
            events <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000"))
        }
    }
}

Event source breakdown:

  1. User Event Source:
    • eventTypes := []string{...} defines realistic user interaction events
    • for { ... } creates an infinite loop for continuous event generation
    • event := eventTypes[rand.Intn(len(eventTypes))] randomly selects an event type
    • events <- fmt.Sprintf("User %s", event) sends the formatted event to the channel
    • time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond) simulates variable user interaction timing (500-1500ms)
  2. System Event Source:
    • eventTypes := []string{...} defines realistic system monitoring events
    • Similar structure to user events but with system-specific event types
    • time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond) simulates slower system event generation (1000-2500ms)
    • System events typically occur less frequently than user events
  3. Timer Event Source:
    • ticker := time.NewTicker(800 * time.Millisecond) creates a ticker for regular events
    • defer ticker.Stop() ensures the ticker is cleaned up when the function exits
    • for { select { case <-ticker.C: ... } } waits for ticker events
    • events <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000")) sends timestamped timer events
    • Timer events occur at regular 800ms intervals, providing predictable timing

Event Processing Functions

func processUserEvent(event string) {
    // Simulate processing time
    time.Sleep(50 * time.Millisecond)
    fmt.Printf("  Processed user event: %s\n", event)
}

func processSystemEvent(event string) {
    // Simulate processing time
    time.Sleep(100 * time.Millisecond)
    fmt.Printf("  Processed system event: %s\n", event)
}

func processTimerEvent(event string) {
    // Simulate processing time
    time.Sleep(30 * time.Millisecond)
    fmt.Printf("  Processed timer event: %s\n", event)
}

Event processing breakdown:

  1. User Event Processing:
    • time.Sleep(50 * time.Millisecond) simulates quick user event processing
    • User events need fast response for good user experience
    • fmt.Printf(" Processed user event: %s\n", event) confirms processing completion
  2. System Event Processing:
    • time.Sleep(100 * time.Millisecond) simulates longer system event processing
    • System events may require more complex processing (file I/O, network calls, etc.)
    • fmt.Printf(" Processed system event: %s\n", event) confirms processing completion
  3. Timer Event Processing:
    • time.Sleep(30 * time.Millisecond) simulates lightweight timer event processing
    • Timer events are typically simple and fast to process
    • fmt.Printf(" Processed timer event: %s\n", event) confirms processing completion

Key Design Patterns:

  1. Single-threaded Event Processing: The event loop processes events sequentially in a single goroutine, preventing race conditions.
  2. Channel-based Communication: All event sources communicate with the event loop through channels, providing natural synchronization.
  3. Select Statement: The select statement provides non-blocking, fair event selection from multiple sources.
  4. Buffered Channels: Buffer size of 10 prevents event sources from blocking when the event loop is busy processing.
  5. Graceful Shutdown: The stop channel provides clean shutdown without data loss or resource leaks.
  6. Structured Events: Each event type has a clear structure and meaning, making the system easy to understand and extend.
  7. Simulated Processing: Random delays simulate real-world processing times and make the pattern’s behavior visible.

How It Works

  1. Event Sources: Multiple goroutines generate events and send them to channels
  2. Event Loop: A single goroutine runs the event loop, listening to all event channels
  3. Event Selection: The select statement waits for events from any source
  4. Event Processing: When an event arrives, it’s processed by the appropriate handler
  5. Non-blocking: The loop continues to process events from other sources while one is being processed
  6. Graceful Shutdown: The loop stops when it receives a stop signal

The pattern ensures that all events are processed in a controlled, sequential manner while maintaining responsiveness to multiple event sources.

Why This Implementation?

Single-threaded Event Processing

Channel-based Event Sources

Select Statement

Centralized Event Handling

Graceful Shutdown

Key Design Decisions

  1. Multiple Event Types: Demonstrates handling different types of events
  2. Simulated Processing: Random delays simulate real event processing time
  3. Buffered Channels: Prevent blocking of event sources
  4. Structured Events: Events have clear structure and meaning
  5. Proper Cleanup: Graceful shutdown with proper resource cleanup

Performance Characteristics

Throughput

Latency

Memory Usage

Common Use Cases

User Interface Systems

Network Applications

System Monitoring

IoT Applications

Financial Systems

Real-time Systems

Embedded Systems

Advanced Patterns

Priority-based Event Processing

Event Filtering and Routing

Event Persistence

Distributed Event Loops

Best Practices

Event Design

Performance Optimization

Error Handling

Monitoring and Observability

The event loop pattern is particularly effective when you have:

This pattern provides a robust foundation for building responsive, efficient systems that can handle multiple event sources while maintaining control and predictability.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"math/rand"
	"time"
)

// RunEventLoop demonstrates the event loop pattern.
func RunEventLoop() {
	fmt.Println("=== Event Loop Pattern Example ===")

	// Create event channels
	userEvents := make(chan string, 10)
	systemEvents := make(chan string, 10)
	timerEvents := make(chan string, 10)
	shutdown := make(chan struct{})

	// Start event producers
	go userEventProducer(userEvents)
	go systemEventProducer(systemEvents)
	go timerEventProducer(timerEvents)

	// Start the event loop
	go eventLoop(userEvents, systemEvents, timerEvents, shutdown)

	// Let the system run for a while
	time.Sleep(5 * time.Second)

	// Shutdown
	fmt.Println("Shutting down event loop...")
	close(shutdown)

	// Wait a bit for cleanup
	time.Sleep(500 * time.Millisecond)
	fmt.Println("Event loop example completed!")
}

// Event loop that processes events from multiple sources
func eventLoop(userEvents, systemEvents, timerEvents <-chan string, shutdown <-chan struct{}) {
	fmt.Println("Event loop started...")

	for {
		select {
		case event := <-userEvents:
			fmt.Printf("Event Loop: Processing user event: %s\n", event)
			processUserEvent(event)

		case event := <-systemEvents:
			fmt.Printf("Event Loop: Processing system event: %s\n", event)
			processSystemEvent(event)

		case event := <-timerEvents:
			fmt.Printf("Event Loop: Processing timer event: %s\n", event)
			processTimerEvent(event)

		case <-shutdown:
			fmt.Println("Event Loop: Shutdown signal received, cleaning up...")
			return
		}
	}
}

// Event producers
func userEventProducer(events chan<- string) {
	userActions := []string{"login", "logout", "click", "scroll", "submit"}
	for i := 0; i < 8; i++ {
		time.Sleep(time.Duration(rand.Intn(800)+200) * time.Millisecond)
		action := userActions[rand.Intn(len(userActions))]
		events <- fmt.Sprintf("%s (user_%d)", action, i+1)
	}
}

func systemEventProducer(events chan<- string) {
	systemEvents := []string{"backup", "update", "maintenance", "alert", "sync"}
	for i := 0; i < 6; i++ {
		time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
		event := systemEvents[rand.Intn(len(systemEvents))]
		events <- fmt.Sprintf("%s (system_%d)", event, i+1)
	}
}

func timerEventProducer(events chan<- string) {
	ticker := time.NewTicker(1 * time.Second)
	defer ticker.Stop()

	count := 0
	for range ticker.C {
		if count >= 5 {
			break
		}
		events <- fmt.Sprintf("heartbeat (timer_%d)", count+1)
		count++
	}
}

// Event processors
func processUserEvent(event string) {
	// Simulate processing time
	time.Sleep(100 * time.Millisecond)
	fmt.Printf("  -> User event processed: %s\n", event)
}

func processSystemEvent(event string) {
	// Simulate processing time
	time.Sleep(150 * time.Millisecond)
	fmt.Printf("  -> System event processed: %s\n", event)
}

func processTimerEvent(event string) {
	// Simulate processing time
	time.Sleep(50 * time.Millisecond)
	fmt.Printf("  -> Timer event processed: %s\n", event)
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the resource pooling pattern.

Exit mobile version