Skip to content
Back

Go Concurrency Patterns: From Goroutines to Production Pipelines

Zhou Xunyou 8 min read backend
Share

Go’s concurrency model is based on CSP (Communicating Sequential Processes), with goroutines and channels as its core primitives. However, there’s a big gap between a demo and production-grade code — goroutine leaks, race conditions, and context cancellation are common pitfalls in production. This article provides a systematic guide to Go’s core concurrency patterns and production-grade practices.

Goroutine Basics and Lifecycle

Goroutine Overhead

// Goroutine initial stack is only 2KB (can grow up to 1GB)
// Compare: Java thread default stack is 1MB
// This means you can easily create millions of goroutines

func main() {
    for i := 0; i < 1_000_000; i++ {
        go func(id int) {
            time.Sleep(10 * time.Second)
        }(i)
    }
    // One million goroutines use only about 2GB of memory
    time.Sleep(15 * time.Second)
}

Goroutine Leaks: The Most Common Concurrency Bug

// Leak example: channel has no consumer, goroutine blocks forever
func leak() <-chan int {
    ch := make(chan int)
    go func() {
        result := expensiveComputation()
        ch <- result // If no consumer, blocks here forever
    }()
    return ch
}

// Fix: use buffered channel or context cancellation
func noLeak(ctx context.Context) <-chan int {
    ch := make(chan int, 1) // Buffered, write won't block
    go func() {
        select {
        case <-ctx.Done():
            return // Context cancelled, goroutine exits
        case ch <- expensiveComputation():
        }
    }()
    return ch
}

Channel Patterns

Fan-out / Fan-in

// Fan-out: distribute work across multiple goroutines
// Fan-in: merge results from multiple goroutines into one channel

func fanOutFanIn(ctx context.Context, items []Item) []Result {
    // Fan-out: launch multiple workers
    workers := make([]<-chan Result, 0, runtime.NumCPU())
    for i := 0; i < runtime.NumCPU(); i++ {
        ch := processItems(ctx, items, i)
        workers = append(workers, ch)
    }

    // Fan-in: merge all worker outputs
    merged := merge(ctx, workers...)

    var results []Result
    for r := range merged {
        results = append(results, r)
    }
    return results
}

func processItems(ctx context.Context, items []Item, workerID int) <-chan Result {
    out := make(chan Result)
    go func() {
        defer close(out)
        for _, item := range items {
            select {
            case <-ctx.Done():
                return
            case out <- process(item):
            }
        }
    }()
    return out
}

func merge(ctx context.Context, channels ...<-chan Result) <-chan Result {
    out := make(chan Result)
    var wg sync.WaitGroup

    // Launch one goroutine per input channel
    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan Result) {
            defer wg.Done()
            for r := range c {
                select {
                case <-ctx.Done():
                    return
                case out <- r:
                }
            }
        }(ch)
    }

    // Close output channel after all goroutines complete
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

Pipeline Pattern

// Pipeline: data flows through multiple stages, each stage is a goroutine

// Stage 1: generate data
func generate(ctx context.Context, nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for _, n := range nums {
            select {
            case <-ctx.Done():
                return
            case out <- n:
            }
        }
    }()
    return out
}

// Stage 2: square computation
func square(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case <-ctx.Done():
                return
            case out <- n * n:
            }
        }
    }()
    return out
}

// Stage 3: print results
func print(ctx context.Context, in <-chan int) {
    for n := range in {
        fmt.Println(n)
    }
}

// Compose the pipeline
func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    // generate → square → square → print
    ch := generate(ctx, 1, 2, 3, 4, 5)
    ch = square(ctx, ch)
    ch = square(ctx, ch) // Fourth power
    print(ctx, ch)
}

Worker Pool

// Fixed number of workers pulling jobs from a jobs channel
func workerPool(ctx context.Context, jobs <-chan Job, results chan<- Result, numWorkers int) {
    var wg sync.WaitGroup

    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs {
                select {
                case <-ctx.Done():
                    return
                case results <- processJob(job):
                }
            }
        }(i)
    }

    go func() {
        wg.Wait()
        close(results)
    }()
}

Context Cancellation and Timeouts

Context Propagation

// Context must be passed as the first parameter of a function
// Do not store context in a struct

// Correct usage
func FetchUser(ctx context.Context, id int) (*User, error) {
    req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
    if err != nil {
        return nil, err
    }
    resp, err := http.DefaultClient.Do(req)
    // ...
}

// Three ways to create context
ctx1 := context.Background()                        // Root context
ctx2, cancel2 := context.WithCancel(ctx1)            // Manual cancel
ctx3, cancel3 := context.WithTimeout(ctx1, 5*time.Second) // Timeout cancel
ctx4 := context.WithValue(ctx1, key, value)           // Pass values (use sparingly)

// Critical: call cancel to release resources
defer cancel2()
defer cancel3()

Using Context Across Multiple Goroutines

func fetchMultiple(ctx context.Context, urls []string) ([]string, error) {
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    type result struct {
        data string
        err  error
    }

    results := make(chan result, len(urls))

    for _, url := range urls {
        go func(u string) {
            req, _ := http.NewRequestWithContext(ctx, "GET", u, nil)
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                results <- result{err: err}
                return
            }
            defer resp.Body.Close()
            body, _ := io.ReadAll(resp.Body)
            results <- result{data: string(body)}
        }(url)
    }

    var data []string
    for i := 0; i < len(urls); i++ {
        r := <-results
        if r.err != nil {
            cancel() // Cancel all requests on any failure
            return nil, r.err
        }
        data = append(data, r.data)
    }
    return data, nil
}

Production-Grade Patterns

errgroup: Goroutine Group with Error Handling

import "golang.org/x/sync/errgroup"

func fetchAll(ctx context.Context, urls []string) (map[string]string, error) {
    g, ctx := errgroup.WithContext(ctx)
    mu := sync.Mutex{}
    results := make(map[string]string)

    for _, url := range urls {
        url := url // Capture loop variable
        g.Go(func() error {
            req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
            if err != nil {
                return err
            }
            resp, err := http.DefaultClient.Do(req)
            if err != nil {
                return err
            }
            defer resp.Body.Close()
            body, err := io.ReadAll(resp.Body)
            if err != nil {
                return err
            }

            mu.Lock()
            results[url] = string(body)
            mu.Unlock()
            return nil
        })
    }

    // Wait for all goroutines, return first error
    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

Semaphore: Limiting Concurrency

import "golang.org/x/sync/semaphore"

func processWithLimit(ctx context.Context, items []Item) error {
    // Maximum 10 concurrent operations
    sem := semaphore.NewWeighted(10)
    g, ctx := errgroup.WithContext(ctx)

    for _, item := range items {
        item := item
        if err := sem.Acquire(ctx, 1); err != nil {
            break // Context cancelled
        }

        g.Go(func() error {
            defer sem.Release(1)
            return processItem(ctx, item)
        })
    }

    return g.Wait()
}

Graceful Shutdown

func main() {
    ctx, cancel := context.WithCancel(context.Background())
    defer cancel()

    // Start server
    server := startServer(ctx)

    // Listen for termination signals
    sigCh := make(chan os.Signal, 1)
    signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)

    <-sigCh
    fmt.Println("Shutting down...")

    // Give in-flight requests 30 seconds to complete
    shutdownCtx, shutdownCancel := context.WithTimeout(
        context.Background(), 30*time.Second,
    )
    defer shutdownCancel()

    if err := server.Shutdown(shutdownCtx); err != nil {
        fmt.Printf("Server shutdown error: %v\n", err)
    }
    fmt.Println("Server stopped")
}

Race Condition Detection

// Go has a built-in race detector — always enable it during development and testing
// go test -race ./...
// go run -race main.go

type Counter struct {
    mu    sync.Mutex
    value int
}

func (c *Counter) Inc() {
    c.mu.Lock()
    c.value++
    c.mu.Unlock()
}

func (c *Counter) Get() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

// sync/atomic is good for simple counters
type AtomicCounter struct {
    value atomic.Int64
}

func (c *AtomicCounter) Inc() {
    c.value.Add(1)
}

func (c *AtomicCounter) Get() int64 {
    return c.value.Load()
}

Common Race Patterns and Fixes

// Bug: concurrent map read/write
// fatal error: concurrent map writes

// Fix 1: sync.RWMutex
type SafeMap[K comparable, V any] struct {
    mu   sync.RWMutex
    data map[K]V
}

func (m *SafeMap[K, V]) Get(key K) (V, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    v, ok := m.data[key]
    return v, ok
}

func (m *SafeMap[K, V]) Set(key K, value V) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}

// Fix 2: sync.Map (better for specific patterns)
var m sync.Map
m.Store("key", "value")
v, ok := m.Load("key")

Common Pitfalls

1. Loop Variable Capture

// Bug: all goroutines use the same variable
for _, item := range items {
    go func() {
        process(item) // item is a reference to the loop variable!
    }()
}

// Fix 1: pass as parameter
for _, item := range items {
    go func(i Item) {
        process(i)
    }(item)
}

// Fix 2: local variable (Go 1.22+ fixes this automatically)
for _, item := range items {
    item := item // Create new variable
    go func() {
        process(item)
    }()
}

2. WaitGroup Add Timing

// Bug: Add inside goroutine — Wait may return before any Add
var wg sync.WaitGroup
for _, item := range items {
    go func() {
        wg.Add(1) // Too late!
        defer wg.Done()
        process(item)
    }()
}
wg.Wait() // May return before any Add is called

// Correct: Add before launching goroutine
var wg sync.WaitGroup
for _, item := range items {
    wg.Add(1)
    go func() {
        defer wg.Done()
        process(item)
    }()
}
wg.Wait()

3. Channel Closure Responsibility

// Principle: only the sender should close a channel; never close an already-closed channel

// Using defer + recover for unexpected cases (not recommended)
// Correct approach: clearly define closure responsibility

func producer(ctx context.Context, out chan<- int) {
    defer close(out) // Producer is responsible for closing
    for i := 0; ; i++ {
        select {
        case <-ctx.Done():
            return
        case out <- i:
        }
    }
}

Conclusion

The core of Go concurrency is “don’t communicate by sharing memory; share memory by communicating”, but production environments also require:

  • Always use context to control timeouts and cancellation, preventing goroutine leaks
  • Use errgroup instead of manual WaitGroup + error management
  • Use semaphore to limit concurrency and prevent resource exhaustion
  • Enable -race for race condition detection
  • Clarify goroutine lifecycles — whoever starts it is responsible for stopping it

Master these patterns, and you’ll write concurrent code that is both concise and production-ready.

Comments