Overview
The Pipeline pattern is a fundamental concurrency pattern that connects multiple stages of processing where each stage processes data and passes it to the next stage. This pattern is particularly useful for data transformation workflows, stream processing, and multi-step computations.
NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.
Implementation Details
Structure
The pipeline implementation in examples/pipeline.go consists of three main stages:
- Generate Numbers – Creates random numbers
- Square Numbers – Squares each number
- Add Ten – Adds 10 to each squared number
Each of these just provides an example of what a workload would look like within the overall pipeline.
Code Analysis
func RunPipeline() {
// Stage 1: Generate numbers
numbers := generateNumbers(10)
// Stage 2: Square the numbers
squared := square(numbers)
// Stage 3: Add 10 to each number
result := addTen(squared)
// Collect and display results
for num := range result {
fmt.Printf("Result: %d\n", num)
}
}
Step-by-step breakdown:
- Stage 1: Number Generation:
numbers := generateNumbers(10)creates the first stage of the pipeline- The function returns a channel that will receive 10 random numbers
- This stage runs in its own goroutine and generates data asynchronously
- The channel provides the connection to the next stage
- Stage 2: Number Squaring:
squared := square(numbers)creates the second stage of the pipeline- Takes the output channel from Stage 1 as input
- Squares each number as it receives it from the previous stage
- Returns a new channel with the squared results
- Stage 3: Adding Ten:
result := addTen(squared)creates the third stage of the pipeline- Takes the output channel from Stage 2 as input
- Adds 10 to each squared number
- Returns the final channel with the processed results
- Result Collection:
for num := range resultconsumes all results from the final stage- The range loop automatically exits when the channel is closed
fmt.Printf("Result: %d\n", num)displays each final result- This is the consumer that drives the entire pipeline
Stage Implementation
Each stage follows the same pattern:
func generateNumbers(count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < count; i++ {
num := rand.Intn(10) + 1
fmt.Printf("Generated: %d\n", num)
out <- num
time.Sleep(100 * time.Millisecond) // Simulate work
}
}()
return out
}
Stage implementation breakdown:
- Function Signature:
func generateNumbers(count int) <-chan intdefines a function that returns a read-only channelcount intspecifies how many numbers to generate<-chan intindicates the function returns a channel that can only be read from- This provides encapsulation – callers can only consume, not produce
- Channel Creation:
out := make(chan int)creates an unbuffered channel for output- Unbuffered channels provide natural synchronization between stages
- Each stage waits for the next stage to be ready before sending data
- This creates a natural flow control mechanism
- Goroutine Launch:
go func() { ... }()launches the stage processing in a background goroutine- This allows the function to return immediately while processing continues
- The stage runs independently and can process data concurrently with other stages
- This is the key to achieving true pipeline concurrency
- Resource Management:
defer close(out)ensures the output channel is closed when the goroutine exits- This signals to downstream stages that no more data is coming
- Proper cleanup prevents goroutine leaks and channel deadlocks
- The defer statement guarantees cleanup even if the function panics
- Data Generation Loop:
for i := 0; i < count; i++generates exactly the specified number of itemsnum := rand.Intn(10) + 1creates random numbers between 1 and 10fmt.Printf("Generated: %d\n", num)logs the generation for debuggingout <- numsends the number to the output channel
- Work Simulation:
time.Sleep(100 * time.Millisecond)simulates processing time- This makes the concurrent nature of the pipeline visible
- Without this delay, the pipeline would run too fast to observe
- The delay also demonstrates how stages can work at different speeds
Pipeline Flow
Let’s trace through a single number’s journey through the pipeline:
- Stage 1 (Generate): Creates number 7, sends to Stage 2
- Stage 2 (Square): Receives 7, squares it to 49, sends to Stage 3
- Stage 3 (Add Ten): Receives 49, adds 10 to get 59, sends to consumer
- Consumer: Receives 59 and displays “Result: 59”
While this is happening, Stage 1 can already be generating the next number, creating true concurrency.
Concurrent Processing Benefits
The pipeline pattern enables several key benefits:
- Overlapped Processing: While Stage 2 is squaring number 7, Stage 1 can be generating number 3
- Resource Utilization: Multiple CPU cores can be utilized simultaneously
- Throughput: Overall processing time is reduced compared to sequential processing
- Scalability: Each stage can be optimized or scaled independently
Channel-based Synchronization
The unbuffered channels provide natural synchronization:
- Stage 1 blocks when sending if Stage 2 isn’t ready to receive
- Stage 2 blocks when sending if Stage 3 isn’t ready to receive
- Stage 3 blocks when sending if the consumer isn’t ready to receive
- This creates a natural backpressure mechanism that prevents memory buildup
Error Handling and Cleanup
The pipeline design includes several safety features:
- Deferred Channel Closing: Each stage properly closes its output channel
- Goroutine Management: Each stage runs in its own goroutine for isolation
- Resource Cleanup: Channels are automatically cleaned up by Go’s garbage collector
- Flow Control: Unbuffered channels prevent memory leaks from unbounded buffering
Key Design Patterns:
- Channel-based Communication: Each stage communicates through channels, providing natural backpressure and synchronization.
- Goroutine-per-Stage: Each stage runs in its own goroutine, enabling true concurrent processing.
- Unbuffered Channels: Ensures each stage processes one item at a time and provides natural flow control.
- Deferred Channel Closing: Each stage properly closes its output channel when done, signaling completion to downstream stages.
- Read-only Channel Returns: Functions return
<-chan intfor encapsulation and safety. - Simulated Work: The
time.Sleep()calls simulate real processing time, making the concurrent nature more visible. - Pipeline Composition: Stages can be easily composed and modified without affecting other stages.
How It Works
- Stage 1 starts generating random numbers and sending them to its output channel
- Stage 2 receives numbers from Stage 1, squares them, and sends results to its output channel
- Stage 3 receives squared numbers, adds 10, and sends final results
- Main function consumes the final results from Stage 3
The pipeline processes data concurrently – while Stage 2 is processing the first number, Stage 1 can be generating the second number, and so on.
Why This Implementation?
Channel-based Design
- Natural Flow Control: Channels provide built-in backpressure
- Thread Safety: Channels handle synchronization automatically
- Composability: Easy to add/remove stages or modify processing logic
Goroutine-per-Stage
- True Concurrency: Each stage can process independently
- Resource Efficiency: Better CPU utilization
- Scalability: Can easily scale individual stages
Unbuffered Channels
- Synchronization: Ensures proper coordination between stages
- Memory Efficiency: No buffering overhead
- Predictable Flow: Each stage processes one item at a time
Common Use Cases
I wrote these up via bullet points just so you, dear reader, could have a quick list to review and put to memory.
Data Processing Pipelines
- ETL (Extract, Transform, Load): Extract data from source, transform it, load to destination
- Image Processing: Resize → Filter → Compress → Save
- Text Processing: Parse → Clean → Analyze → Generate Report
Stream Processing
- Log Processing: Parse → Filter → Aggregate → Store
- Sensor Data: Collect → Validate → Process → Alert
- Financial Data: Receive → Validate → Calculate → Store
API Processing
- Request Pipeline: Authenticate → Validate → Process → Respond
- Data Pipeline: Fetch → Transform → Cache → Return
Real-time Systems
- IoT Data: Collect → Process → Analyze → Act
- Trading Systems: Receive → Validate → Execute → Confirm
- Monitoring: Collect → Analyze → Alert → Log
The pipeline pattern is particularly effective when you have a series of transformations that can be performed independently and when you want to maximize throughput through concurrent processing.
The final example implementation looks like this:
package examples
import (
"fmt"
"math/rand"
"time"
)
// Pipeline demonstrates a multi-stage data processing pipeline
func RunPipeline() {
fmt.Println("=== Pipeline Pattern Example ===")
// Stage 1: Generate numbers
numbers := generateNumbers(10)
// Stage 2: Square the numbers
squared := square(numbers)
// Stage 3: Add 10 to each number
result := addTen(squared)
// Collect and display results
fmt.Println("Pipeline stages:")
fmt.Println("1. Generate numbers")
fmt.Println("2. Square numbers")
fmt.Println("3. Add 10")
fmt.Println()
for num := range result {
fmt.Printf("Result: %d\n", num)
}
fmt.Println("Pipeline completed!")
}
// Stage 1: Generate random numbers
func generateNumbers(count int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for i := 0; i < count; i++ {
num := rand.Intn(10) + 1
fmt.Printf("Generated: %d\n", num)
out <- num
time.Sleep(100 * time.Millisecond) // Simulate work
}
}()
return out
}
// Stage 2: Square the numbers
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
squared := num * num
fmt.Printf("Squared %d -> %d\n", num, squared)
out <- squared
time.Sleep(150 * time.Millisecond) // Simulate work
}
}()
return out
}
// Stage 3: Add 10 to each number
func addTen(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for num := range in {
result := num + 10
fmt.Printf("Added 10 to %d -> %d\n", num, result)
out <- result
time.Sleep(100 * time.Millisecond) // Simulate work
}
}()
return out
}
To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the fan-in fan-out pattern.

