Site icon Adron's Composite Code

Go Concurrency Patterns(Rate Limiting Pattern)

Overview

The Rate Limiting pattern controls the frequency of operations to prevent resource exhaustion and ensure fair usage. This pattern is essential for API request throttling, resource protection, preventing DoS attacks, and ensuring system stability under load.

NOTE: For other posts on concurrency patterns, check out the index post to this series of concurrency patterns.

Implementation Details

Structure

The rate limiting implementation in examples/rate_limiting.go demonstrates two main techniques:

  1. Fixed Rate Limiting – Using time.Ticker for consistent rate control
  2. Token Bucket Rate Limiting – Using a token bucket algorithm for burst handling

Code Analysis

func RunRateLimiting() {
    // Example 1: Fixed rate limiting
    fmt.Println("\n1. Fixed rate limiting (2 requests per second):")
    limiter := newFixedRateLimiter(2, time.Second)
    var wg sync.WaitGroup

    for i := 1; i <= 6; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            limiter.Wait()
            fmt.Printf("Request %d processed at %v\n", id, time.Now().Format("15:04:05.000"))
        }(i)
    }

    wg.Wait()

    // Example 2: Token bucket rate limiting
    fmt.Println("\n2. Token bucket rate limiting (3 tokens per second, burst of 5):")
    tokenLimiter := newTokenBucketLimiter(3, 5)
    var wg2 sync.WaitGroup

    for i := 1; i <= 10; i++ {
        wg2.Add(1)
        go func(id int) {
            defer wg2.Done()
            if tokenLimiter.Allow() {
                fmt.Printf("Token request %d granted at %v\n", id, time.Now().Format("15:04:05.000"))
            } else {
                fmt.Printf("Token request %d denied at %v\n", id, time.Now().Format("15:04:05.000"))
            }
        }(i)
    }

    wg2.Wait()
}

Fixed Rate Limiter Implementation

type fixedRateLimiter struct {
    ticker *time.Ticker
    stop   chan struct{}
}

func newFixedRateLimiter(rate int, interval time.Duration) *fixedRateLimiter {
    limiter := &fixedRateLimiter{
        ticker: time.NewTicker(interval / time.Duration(rate)),
        stop:   make(chan struct{}),
    }
    return limiter
}

func (r *fixedRateLimiter) Wait() {
    <-r.ticker.C
}

func (r *fixedRateLimiter) Stop() {
    r.ticker.Stop()
    close(r.stop)
}

Token Bucket Limiter Implementation

type tokenBucketLimiter struct {
    tokens    chan struct{}
    rate      time.Duration
    burst     int
    mu        sync.Mutex
    lastRefill time.Time
}

func newTokenBucketLimiter(rate int, burst int) *tokenBucketLimiter {
    limiter := &tokenBucketLimiter{
        tokens:     make(chan struct{}, burst),
        rate:       time.Second / time.Duration(rate),
        burst:      burst,
        lastRefill: time.Now(),
    }

    // Fill the bucket initially
    for i := 0; i < burst; i++ {
        limiter.tokens <- struct{}{}
    }

    // Start refilling tokens
    go limiter.refill()

    return limiter
}

func (t *tokenBucketLimiter) refill() {
    ticker := time.NewTicker(t.rate)
    defer ticker.Stop()

    for range ticker.C {
        select {
        case t.tokens <- struct{}{}:
            // Token added successfully
        default:
            // Bucket is full, skip
        }
    }
}

func (t *tokenBucketLimiter) Allow() bool {
    select {
    case <-t.tokens:
        return true
    default:
        return false
    }
}

func (t *tokenBucketLimiter) Wait() {
    <-t.tokens
}

How It Works

Fixed Rate Limiting

  1. Ticker Creation: Creates a ticker that fires at the desired rate
  2. Request Processing: Each request waits for the next tick
  3. Rate Control: Requests are processed at exactly the specified rate
  4. Synchronization: All requests are synchronized to the ticker

Token Bucket Rate Limiting

  1. Bucket Initialization: Creates a bucket filled with initial tokens
  2. Token Consumption: Requests consume tokens from the bucket
  3. Token Refilling: Tokens are refilled at a constant rate
  4. Burst Handling: Can handle bursts up to the bucket capacity

Why This Implementation?

Fixed Rate Limiter

Token Bucket Limiter

Channel-based Token Storage

Goroutine-based Refilling

Key Design Decisions

  1. Two Different Approaches: Demonstrates both fixed-rate and token-bucket techniques
  2. Simulated Requests: Multiple concurrent requests show rate limiting in action
  3. Timing Information: Output includes timestamps to show rate control
  4. Non-blocking Token Check: Token bucket allows immediate denial of requests
  5. Proper Cleanup: Both limiters provide cleanup methods

Performance Characteristics

Fixed Rate Limiter

Token Bucket Limiter

Memory Usage

Common Use Cases

API Rate Limiting

Resource Protection

User Experience

Security

System Stability

Microservices

Advanced Patterns

Sliding Window Rate Limiting

Leaky Bucket Rate Limiting

Distributed Rate Limiting

Adaptive Rate Limiting

Best Practices

Rate Limit Configuration

Error Handling

Monitoring and Alerting

The rate limiting pattern is particularly effective when you have:

This pattern provides essential tools for building robust, scalable systems that can handle varying loads while protecting system resources and ensuring fair usage.

The final example implementation looks like this:

package examples

import (
	"fmt"
	"sync"
	"time"
)

// RunRateLimiting demonstrates rate limiting patterns.
func RunRateLimiting() {
	fmt.Println("=== Rate Limiting Pattern Example ===")

	// Example 1: Fixed rate limiting
	fmt.Println("\n1. Fixed rate limiting (2 requests per second):")
	limiter := newFixedRateLimiter(2, time.Second)
	var wg sync.WaitGroup

	for i := 1; i <= 6; i++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			limiter.Wait()
			fmt.Printf("Request %d processed at %v\n", id, time.Now().Format("15:04:05.000"))
		}(i)
	}

	wg.Wait()

	// Example 2: Token bucket rate limiting
	fmt.Println("\n2. Token bucket rate limiting (3 tokens per second, burst of 5):")
	tokenLimiter := newTokenBucketLimiter(3, 5)
	var wg2 sync.WaitGroup

	for i := 1; i <= 10; i++ {
		wg2.Add(1)
		go func(id int) {
			defer wg2.Done()
			if tokenLimiter.Allow() {
				fmt.Printf("Token request %d granted at %v\n", id, time.Now().Format("15:04:05.000"))
			} else {
				fmt.Printf("Token request %d denied at %v\n", id, time.Now().Format("15:04:05.000"))
			}
		}(i)
	}

	wg2.Wait()

	fmt.Println("\nRate Limiting example completed!")
}

// Fixed rate limiter using time.Ticker
type fixedRateLimiter struct {
	ticker *time.Ticker
	stop   chan struct{}
}

func newFixedRateLimiter(rate int, interval time.Duration) *fixedRateLimiter {
	limiter := &fixedRateLimiter{
		ticker: time.NewTicker(interval / time.Duration(rate)),
		stop:   make(chan struct{}),
	}
	return limiter
}

func (r *fixedRateLimiter) Wait() {
	<-r.ticker.C
}

func (r *fixedRateLimiter) Stop() {
	r.ticker.Stop()
	close(r.stop)
}

// Token bucket rate limiter
type tokenBucketLimiter struct {
	tokens     chan struct{}
	rate       time.Duration
	burst      int
	mu         sync.Mutex
	lastRefill time.Time
}

func newTokenBucketLimiter(rate int, burst int) *tokenBucketLimiter {
	limiter := &tokenBucketLimiter{
		tokens:     make(chan struct{}, burst),
		rate:       time.Second / time.Duration(rate),
		burst:      burst,
		lastRefill: time.Now(),
	}

	// Fill the bucket initially
	for i := 0; i < burst; i++ {
		limiter.tokens <- struct{}{}
	}

	// Start refilling tokens
	go limiter.refill()

	return limiter
}

func (t *tokenBucketLimiter) refill() {
	ticker := time.NewTicker(t.rate)
	defer ticker.Stop()

	for range ticker.C {
		select {
		case t.tokens <- struct{}{}:
			// Token added successfully
		default:
			// Bucket is full, skip
		}
	}
}

func (t *tokenBucketLimiter) Allow() bool {
	select {
	case <-t.tokens:
		return true
	default:
		return false
	}
}

func (t *tokenBucketLimiter) Wait() {
	<-t.tokens
}

To run this example, and build the code yourself, check out this and other examples in the go-fluency-concurrency-model-patterns repo. That’s it for this topic, tomorrow I’ll post on the mapreduce pattern.

Exit mobile version