Overview
The Event Loop pattern processes events from multiple sources in a single thread using a central event loop. This pattern is essential for handling multiple event sources efficiently, managing I/O operations, building reactive systems, and coordinating multiple concurrent operations in a controlled manner.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The event loop implementation in examples/event_loop.go consists of three main components:
- Event Loop – Central coordinator that processes events from multiple channels
- Event Sources – Multiple goroutines that generate events
- Event Processing – Centralized event handling logic
Code Analysis
Let’s break down the main function and understand how each component works:
func RunEventLoop() {
// Create event channels
userEvents := make(chan string, 10)
systemEvents := make(chan string, 10)
timerEvents := make(chan string, 10)
stop := make(chan struct{})
// Start event sources
go userEventSource(userEvents)
go systemEventSource(systemEvents)
go timerEventSource(timerEvents)
// Start the event loop
go eventLoop(userEvents, systemEvents, timerEvents, stop)
// Let the event loop run for a while
time.Sleep(5 * time.Second)
close(stop)
fmt.Println("Event loop stopped.")
}
Step-by-step breakdown:
- Event Channel Creation:
userEvents := make(chan string, 10)creates a buffered channel for user eventssystemEvents := make(chan string, 10)creates a buffered channel for system eventstimerEvents := make(chan string, 10)creates a buffered channel for timer eventsstop := make(chan struct{})creates a signal channel for graceful shutdown- Buffer size of 10 prevents blocking when event sources generate events faster than processing
- Event Source Launch:
go userEventSource(userEvents)launches the user event generator in backgroundgo systemEventSource(systemEvents)launches the system event generator in backgroundgo timerEventSource(timerEvents)launches the timer event generator in background- Each event source runs independently and continuously generates events
- Event Loop Launch:
go eventLoop(userEvents, systemEvents, timerEvents, stop)starts the central event loop- The event loop runs in its own goroutine to process events from all sources
- This is the core coordinator that handles all events sequentially
- Demonstration Period:
time.Sleep(5 * time.Second)lets the event loop run for 5 seconds- This provides enough time to see the pattern in action
- During this time, all event sources are generating events and the loop is processing them
- Graceful Shutdown:
close(stop)signals the event loop to stop processing- The event loop will receive this signal and exit gracefully
fmt.Println("Event loop stopped.")confirms the shutdown
Event Loop Implementation
func eventLoop(userEvents, systemEvents, timerEvents <-chan string, stop <-chan struct{}) {
fmt.Println("Event loop started. Processing events...")
for {
select {
case event := <-userEvents:
fmt.Printf("Event Loop: Processing user event: %s\n", event)
processUserEvent(event)
case event := <-systemEvents:
fmt.Printf("Event Loop: Processing system event: %s\n", event)
processSystemEvent(event)
case event := <-timerEvents:
fmt.Printf("Event Loop: Processing timer event: %s\n", event)
processTimerEvent(event)
case <-stop:
fmt.Println("Event Loop: Received stop signal, shutting down...")
return
}
}
}
Event loop breakdown:
- Function Signature:
- Takes read-only channels (
<-chan string) for each event type - Takes a read-only stop channel (
<-chan struct{}) for shutdown - Directional channels provide encapsulation and prevent accidental misuse
- Takes read-only channels (
- Infinite Loop Structure:
for { ... }creates an infinite loop that continuously processes events- The loop will run until explicitly stopped by the stop signal
- This ensures the event loop is always ready to handle events
- Select Statement:
selectstatement waits for events from any of the four channels- Non-blocking – if no events are available, it waits efficiently
- Fair selection – all event sources have equal priority
- Only one case executes at a time, ensuring sequential processing
- User Event Handling:
case event := <-userEvents:handles user events (clicks, keypresses, etc.)fmt.Printf("Event Loop: Processing user event: %s\n", event)logs the eventprocessUserEvent(event)calls the appropriate handler function- User events typically need quick response for good UX
- System Event Handling:
case event := <-systemEvents:handles system events (file changes, network status, etc.)fmt.Printf("Event Loop: Processing system event: %s\n", event)logs the eventprocessSystemEvent(event)calls the system event handler- System events may require more processing time
- Timer Event Handling:
case event := <-timerEvents:handles timer events (periodic ticks)fmt.Printf("Event Loop: Processing timer event: %s\n", event)logs the eventprocessTimerEvent(event)calls the timer event handler- Timer events are typically lightweight and frequent
- Stop Signal Handling:
case <-stop:handles the shutdown signalfmt.Println("Event Loop: Received stop signal, shutting down...")logs shutdownreturnexits the event loop gracefully- This ensures clean termination without data loss
Event Source Implementations
func userEventSource(events chan<- string) {
eventTypes := []string{"click", "keypress", "scroll", "hover", "submit"}
for {
event := eventTypes[rand.Intn(len(eventTypes))]
events <- fmt.Sprintf("User %s", event)
time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
}
}
func systemEventSource(events chan<- string) {
eventTypes := []string{"file_change", "network_status", "memory_usage", "cpu_load", "disk_space"}
for {
event := eventTypes[rand.Intn(len(eventTypes))]
events <- fmt.Sprintf("System %s", event)
time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond)
}
}
func timerEventSource(events chan<- string) {
ticker := time.NewTicker(800 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ticker.C:
events <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000"))
}
}
}
Event source breakdown:
- User Event Source:
eventTypes := []string{...}defines realistic user interaction eventsfor { ... }creates an infinite loop for continuous event generationevent := eventTypes[rand.Intn(len(eventTypes))]randomly selects an event typeevents <- fmt.Sprintf("User %s", event)sends the formatted event to the channeltime.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)simulates variable user interaction timing (500-1500ms)
- System Event Source:
eventTypes := []string{...}defines realistic system monitoring events- Similar structure to user events but with system-specific event types
time.Sleep(time.Duration(rand.Intn(1500)+1000) * time.Millisecond)simulates slower system event generation (1000-2500ms)- System events typically occur less frequently than user events
- Timer Event Source:
ticker := time.NewTicker(800 * time.Millisecond)creates a ticker for regular eventsdefer ticker.Stop()ensures the ticker is cleaned up when the function exitsfor { select { case <-ticker.C: ... } }waits for ticker eventsevents <- fmt.Sprintf("Timer tick at %v", time.Now().Format("15:04:05.000"))sends timestamped timer events- Timer events occur at regular 800ms intervals, providing predictable timing
Event Processing Functions
func processUserEvent(event string) {
// Simulate processing time
time.Sleep(50 * time.Millisecond)
fmt.Printf(" Processed user event: %s\n", event)
}
func processSystemEvent(event string) {
// Simulate processing time
time.Sleep(100 * time.Millisecond)
fmt.Printf(" Processed system event: %s\n", event)
}
func processTimerEvent(event string) {
// Simulate processing time
time.Sleep(30 * time.Millisecond)
fmt.Printf(" Processed timer event: %s\n", event)
}
Event processing breakdown:
- User Event Processing:
time.Sleep(50 * time.Millisecond)simulates quick user event processing- User events need fast response for good user experience
fmt.Printf(" Processed user event: %s\n", event)confirms processing completion
- System Event Processing:
time.Sleep(100 * time.Millisecond)simulates longer system event processing- System events may require more complex processing (file I/O, network calls, etc.)
fmt.Printf(" Processed system event: %s\n", event)confirms processing completion
- Timer Event Processing:
time.Sleep(30 * time.Millisecond)simulates lightweight timer event processing- Timer events are typically simple and fast to process
fmt.Printf(" Processed timer event: %s\n", event)confirms processing completion
Key Design Patterns:
- Single-threaded Event Processing: The event loop processes events sequentially in a single goroutine, preventing race conditions.
- Channel-based Communication: All event sources communicate with the event loop through channels, providing natural synchronization.
- Select Statement: The
selectstatement provides non-blocking, fair event selection from multiple sources. - Buffered Channels: Buffer size of 10 prevents event sources from blocking when the event loop is busy processing.
- Graceful Shutdown: The stop channel provides clean shutdown without data loss or resource leaks.
- Structured Events: Each event type has a clear structure and meaning, making the system easy to understand and extend.
- Simulated Processing: Random delays simulate real-world processing times and make the pattern’s behavior visible.
How It Works
- Event Sources: Multiple goroutines generate events and send them to channels
- Event Loop: A single goroutine runs the event loop, listening to all event channels
- Event Selection: The
selectstatement waits for events from any source - Event Processing: When an event arrives, it’s processed by the appropriate handler
- Non-blocking: The loop continues to process events from other sources while one is being processed
- Graceful Shutdown: The loop stops when it receives a stop signal
The pattern ensures that all events are processed in a controlled, sequential manner while maintaining responsiveness to multiple event sources.
Why This Implementation?
Single-threaded Event Processing
- Sequential Processing: Events are processed one at a time, preventing race conditions
- Predictable Behavior: Event processing order is deterministic
- Resource Efficiency: No need for complex synchronization between event handlers
- Debugging: Easier to debug and reason about event processing
Channel-based Event Sources
- Asynchronous Generation: Event sources can generate events independently
- Non-blocking: Event sources don’t block when the event loop is busy
- Buffering: Buffered channels prevent event loss during processing delays
- Scalability: Easy to add or remove event sources
Select Statement
- Non-blocking Selection: Waits for events from any source without blocking
- Fair Selection: All event sources have equal priority
- Efficient Waiting: Efficiently waits for multiple channels
- Clean Code: Provides clear, readable event handling logic
Centralized Event Handling
- Single Point of Control: All events flow through one place
- Consistent Processing: All events are processed with the same logic
- Easy Monitoring: Easy to monitor and log all events
- State Management: Centralized state management for event processing
Graceful Shutdown
- Controlled Termination: Event loop can be stopped gracefully
- Resource Cleanup: Proper cleanup of resources when stopping
- Coordinated Shutdown: All event sources can be coordinated during shutdown
- No Data Loss: Ensures no events are lost during shutdown
Key Design Decisions
- Multiple Event Types: Demonstrates handling different types of events
- Simulated Processing: Random delays simulate real event processing time
- Buffered Channels: Prevent blocking of event sources
- Structured Events: Events have clear structure and meaning
- Proper Cleanup: Graceful shutdown with proper resource cleanup
Performance Characteristics
Throughput
- Sequential Processing: Throughput limited by processing time of individual events
- Event Queuing: Events can queue in channels during processing delays
- Fair Processing: All event sources are processed fairly
- Predictable Performance: Performance is predictable and consistent
Latency
- Processing Time: Latency depends on event processing time
- Queue Time: Events may wait in channels if event loop is busy
- Fair Scheduling: All event sources experience similar latency
- Responsiveness: System remains responsive to all event sources
Memory Usage
- Channel Buffering: Memory usage depends on channel buffer sizes
- Event Queuing: Queued events consume memory
- Efficient Storage: Minimal overhead for event storage
- Scalability: Memory usage scales with number of event sources
Common Use Cases
User Interface Systems
- GUI Applications: Handle mouse, keyboard, and window events
- Web Applications: Process user interactions and DOM events
- Game Engines: Handle input events and game state updates
- Mobile Apps: Process touch, gesture, and sensor events
Network Applications
- Web Servers: Handle HTTP requests and responses
- Chat Applications: Process messages and connection events
- API Gateways: Handle API requests and routing
- Load Balancers: Process connection and health check events
System Monitoring
- Process Monitoring: Handle process lifecycle events
- Resource Monitoring: Process CPU, memory, and disk events
- Network Monitoring: Handle network status and traffic events
- Log Processing: Process log file and system log events
IoT Applications
- Sensor Data: Process data from multiple sensors
- Device Control: Handle device status and control events
- Protocol Processing: Process various IoT protocol events
- Gateway Management: Handle device registration and communication events
Financial Systems
- Trading Systems: Process market data and order events
- Risk Management: Handle risk calculation and alert events
- Settlement Systems: Process transaction and settlement events
- Compliance Monitoring: Handle regulatory and compliance events
Real-time Systems
- Streaming Applications: Process media stream events
- Real-time Analytics: Handle analytics and reporting events
- Collaboration Tools: Process user collaboration events
- Live Broadcasting: Handle broadcast and viewer events
Embedded Systems
- Hardware Control: Process hardware interrupt events
- Sensor Networks: Handle sensor data and network events
- Control Systems: Process control and feedback events
- Automation Systems: Handle automation and scheduling events
Advanced Patterns
Priority-based Event Processing
- Event Priority: Process events based on priority levels
- Priority Queues: Use priority queues for event ordering
- Urgent Events: Handle urgent events with higher priority
- Fair Scheduling: Ensure fair processing across priority levels
Event Filtering and Routing
- Event Filtering: Filter events based on criteria
- Event Routing: Route events to appropriate handlers
- Event Transformation: Transform events before processing
- Event Aggregation: Aggregate multiple events into single events
Event Persistence
- Event Logging: Log all events for audit and debugging
- Event Replay: Replay events for testing and recovery
- Event Storage: Store events for later processing
- Event Archiving: Archive old events for compliance
Distributed Event Loops
- Multi-node Coordination: Coordinate event loops across multiple nodes
- Event Distribution: Distribute events across multiple event loops
- Load Balancing: Balance event processing load across nodes
- Fault Tolerance: Handle failures in distributed event processing
Best Practices
Event Design
- Structured Events: Use structured event types with clear meaning
- Event Size: Keep events reasonably sized for performance
- Event Immutability: Make events immutable to prevent side effects
- Event Validation: Validate events before processing
Performance Optimization
- Event Batching: Batch similar events for efficient processing
- Async Processing: Use async processing for long-running operations
- Event Pooling: Reuse event objects to reduce allocation overhead
- Memory Management: Monitor and manage memory usage
Error Handling
- Event Error Handling: Handle errors in event processing gracefully
- Error Recovery: Implement recovery mechanisms for failed events
- Error Reporting: Report errors for monitoring and debugging
- Circuit Breaker: Implement circuit breakers for external dependencies
Monitoring and Observability
- Event Metrics: Track event processing metrics
- Performance Monitoring: Monitor event loop performance
- Event Tracing: Trace events through the system
- Alerting: Alert on unusual event patterns or failures
The event loop pattern is particularly effective when you have:
- Multiple Event Sources: Multiple sources generating events
- Sequential Processing: Need for sequential, controlled event processing
- Resource Constraints: Limited resources that need efficient management
- Real-time Requirements: Need for responsive event handling
- State Management: Complex state that needs centralized management
This pattern provides a robust foundation for building responsive, efficient systems that can handle multiple event sources while maintaining control and predictability.
The final example implementation looks like this:
package examples
import (
"fmt"
"math/rand"
"time"
)
// RunEventLoop demonstrates the event loop pattern.
func RunEventLoop() {
fmt.Println("=== Event Loop Pattern Example ===")
// Create event channels
userEvents := make(chan string, 10)
systemEvents := make(chan string, 10)
timerEvents := make(chan string, 10)
shutdown := make(chan struct{})
// Start event producers
go userEventProducer(userEvents)
go systemEventProducer(systemEvents)
go timerEventProducer(timerEvents)
// Start the event loop
go eventLoop(userEvents, systemEvents, timerEvents, shutdown)
// Let the system run for a while
time.Sleep(5 * time.Second)
// Shutdown
fmt.Println("Shutting down event loop...")
close(shutdown)
// Wait a bit for cleanup
time.Sleep(500 * time.Millisecond)
fmt.Println("Event loop example completed!")
}
// Event loop that processes events from multiple sources
func eventLoop(userEvents, systemEvents, timerEvents <-chan string, shutdown <-chan struct{}) {
fmt.Println("Event loop started...")
for {
select {
case event := <-userEvents:
fmt.Printf("Event Loop: Processing user event: %s\n", event)
processUserEvent(event)
case event := <-systemEvents:
fmt.Printf("Event Loop: Processing system event: %s\n", event)
processSystemEvent(event)
case event := <-timerEvents:
fmt.Printf("Event Loop: Processing timer event: %s\n", event)
processTimerEvent(event)
case <-shutdown:
fmt.Println("Event Loop: Shutdown signal received, cleaning up...")
return
}
}
}
// Event producers
func userEventProducer(events chan<- string) {
userActions := []string{"login", "logout", "click", "scroll", "submit"}
for i := 0; i < 8; i++ {
time.Sleep(time.Duration(rand.Intn(800)+200) * time.Millisecond)
action := userActions[rand.Intn(len(userActions))]
events <- fmt.Sprintf("%s (user_%d)", action, i+1)
}
}
func systemEventProducer(events chan<- string) {
systemEvents := []string{"backup", "update", "maintenance", "alert", "sync"}
for i := 0; i < 6; i++ {
time.Sleep(time.Duration(rand.Intn(1000)+500) * time.Millisecond)
event := systemEvents[rand.Intn(len(systemEvents))]
events <- fmt.Sprintf("%s (system_%d)", event, i+1)
}
}
func timerEventProducer(events chan<- string) {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
count := 0
for range ticker.C {
if count >= 5 {
break
}
events <- fmt.Sprintf("heartbeat (timer_%d)", count+1)
count++
}
}
// Event processors
func processUserEvent(event string) {
// Simulate processing time
time.Sleep(100 * time.Millisecond)
fmt.Printf(" -> User event processed: %s\n", event)
}
func processSystemEvent(event string) {
// Simulate processing time
time.Sleep(150 * time.Millisecond)
fmt.Printf(" -> System event processed: %s\n", event)
}
func processTimerEvent(event string) {
// Simulate processing time
time.Sleep(50 * time.Millisecond)
fmt.Printf(" -> Timer event processed: %s\n", event)
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the resource pooling pattern.