Skip to content
ioob.dev
Go back

Go Part 10 — Concurrency Patterns

· 4 min read
Go Series (10/12)
  1. Go Basics Part 1 — Variables, Constants, and Types
  2. Go Basics Part 2 — Conditionals and Loops
  3. Go Basics Part 3 — Functions
  4. Go Basics Part 4 — Error Handling
  5. Go Basics Part 5 — Arrays, Slices, and Maps
  6. Go Basics Part 6 — Structs and Methods
  7. Go Part 7 — Interfaces
  8. Go Part 8 — Pointers
  9. Go Part 9 — Goroutines and Channels
  10. Go Part 10 — Concurrency Patterns
  11. Go Part 11 — Packages and Modules
  12. Go Part 12 — Generics and Practical Patterns
Table of contents

Table of contents

From Basics to Practice

Part 9 covered the basics of goroutines and channels. This part is about using those tools to build patterns frequently employed in the real world. Concurrent code can dramatically improve performance when done well, but done carelessly, it leads to debugging hell. Knowing patterns versus not knowing them makes a huge difference.

Worker Pool

One of the most common concurrency patterns. When there are many tasks to process, you spin up a fixed number of worker goroutines and distribute tasks through a queue. This gives you better resource control than spawning an unlimited number of goroutines per task.

Grasping the overall flow through a diagram first makes the code easier to follow.

flowchart LR
    Producer["Producer\n(create jobs)"]
    Jobs["jobs channel"]
    W1["Worker 1"]
    W2["Worker 2"]
    W3["Worker 3"]
    Results["results channel"]
    Consumer["Consumer\n(collect results)"]

    Producer -->|"submit jobs"| Jobs
    Jobs -->|"compete to\ndequeue"| W1
    Jobs --> W2
    Jobs --> W3
    W1 -->|"processed result"| Results
    W2 --> Results
    W3 --> Results
    Results -->|"range\niteration"| Consumer
package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, jobs <-chan int, results chan<- string, wg *sync.WaitGroup) {
    defer wg.Done()
    for job := range jobs {
        // Process the job (simulation)
        time.Sleep(50 * time.Millisecond)
        results <- fmt.Sprintf("worker%d -> job%d done", id, job)
    }
}

func main() {
    const numWorkers = 3
    const numJobs = 10

    jobs := make(chan int, numJobs)
    results := make(chan string, numJobs)

    var wg sync.WaitGroup

    // Start workers
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // Submit jobs
    for j := 1; j <= numJobs; j++ {
        jobs <- j
    }
    close(jobs) // No more jobs

    // Close results channel after all workers finish
    go func() {
        wg.Wait()
        close(results)
    }()

    // Collect results
    for result := range results {
        fmt.Println(result)
    }
}

Jobs are sent through the jobs channel, and 3 workers compete to dequeue them. With 10 jobs split among 3 workers, it finishes roughly 3 times faster than sequential execution. Adjusting the number of workers lets you prevent excessive CPU or network resource usage.

Here’s the core structure summarized:

  1. Create job and result channels
  2. Spin up a fixed number of workers
  3. Submit jobs to the channel, then close it when done
  4. Close the result channel once all workers finish

Fan-Out / Fan-In

Similar to Worker Pool but from a different perspective. Fan-Out distributes a single input across multiple goroutines, and Fan-In merges the outputs of multiple goroutines into one.

package main

import (
    "fmt"
    "sync"
)

// Fan-Out: Distribute from one source to multiple workers
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

// Fan-In: Merge multiple channels into one
func merge(channels ...<-chan int) <-chan int {
    out := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for n := range c {
                out <- n
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    // Generate data
    nums := generate(1, 2, 3, 4, 5, 6, 7, 8)

    // Fan-Out: 3 workers share the same input channel
    sq1 := square(nums)
    sq2 := square(nums)
    sq3 := square(nums)

    // Fan-In: Merge 3 results into one
    for result := range merge(sq1, sq2, sq3) {
        fmt.Println(result)
    }
}

generate produces numbers, 3 square goroutines simultaneously dequeue and square them (Fan-Out), and merge combines each result channel into one (Fan-In). This pipeline-like structure where data flows through is a very common pattern in Go’s concurrent programming.

context.Context — Cancellation and Timeouts

When writing concurrent code in practice, there’s a problem you’ll inevitably face: “When should this work stop?” When a user cancels a request, when something takes too long, or when a parent task fails, child goroutines need to be cleaned up too. The context package handles this.

package main

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(5 * time.Second):
        return "operation complete", nil
    case <-ctx.Done():
        return "", ctx.Err()
    }
}

func main() {
    // 2-second timeout
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel() // Must be called for resource cleanup

    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("failed:", err) // context deadline exceeded
        return
    }
    fmt.Println(result)
}

context.WithTimeout creates a context that automatically sends a cancellation signal after the specified time. ctx.Done() returns a channel that closes on cancellation, and using select to wait on both this channel and task completion handles timeouts cleanly.

Manual cancellation is also possible.

package main

import (
    "context"
    "fmt"
    "time"
)

func longTask(ctx context.Context, name string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("%s: cancelled (%v)\n", name, ctx.Err())
            return
        case <-time.After(500 * time.Millisecond):
            fmt.Printf("%s: working...\n", name)
        }
    }
}

func main() {
    ctx, cancel := context.WithCancel(context.Background())

    go longTask(ctx, "worker1")
    go longTask(ctx, "worker2")

    time.Sleep(2 * time.Second)
    fmt.Println("--- sending cancel signal ---")
    cancel() // Propagate cancellation to all child goroutines

    time.Sleep(100 * time.Millisecond) // Cleanup time
}

Calling cancel() propagates the cancellation signal to all goroutines derived from this context. Thanks to the hierarchical structure where cancelling a parent context automatically cancels children, even complex task trees can be cleanly shut down with a single call.

defer cancel() should be written out of habit. If you create a context and don’t call cancel, internal timers or goroutines will leak.

Race Conditions

The most dangerous bug in concurrency is a race condition. It occurs when multiple goroutines read and write the same data simultaneously.

package main

import (
    "fmt"
    "sync"
)

func main() {
    counter := 0
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter++ // Race condition! Multiple goroutines modify simultaneously
        }()
    }

    wg.Wait()
    fmt.Println("counter:", counter) // Might not be 1000
}

With 1000 goroutines each incrementing counter by 1, you’d expect the result to be 1000, but it might be less. counter++ consists of three steps — “read, increment, write” — and when two goroutines read the same value simultaneously, one increment gets swallowed.

Go has a built-in tool for detecting race conditions. Running with go run -race catches them at runtime.

sync.Mutex — Mutual Exclusion

For shared state problems that are hard to solve with channels, use Mutex.

package main

import (
    "fmt"
    "sync"
)

type SafeCounter struct {
    mu    sync.Mutex
    value int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.value++
}

func (c *SafeCounter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.value
}

func main() {
    counter := &SafeCounter{}
    var wg sync.WaitGroup

    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            counter.Increment()
        }()
    }

    wg.Wait()
    fmt.Println("counter:", counter.Value()) // Always 1000
}

Between Lock() and Unlock(), only one goroutine executes at a time. Using defer c.mu.Unlock() ensures the lock is always released regardless of how the function exits.

For cases with many reads and few writes, sync.RWMutex is more efficient.

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}

func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // Read lock — multiple goroutines can read simultaneously
    defer c.mu.RUnlock()
    val, ok := c.data[key]
    return val, ok
}

func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // Write lock — only one can write
    defer c.mu.Unlock()
    c.data[key] = value
}

RLock is a read lock that multiple goroutines can hold simultaneously. When a write lock (Lock) is held, both reads and writes must wait. It’s well-suited for data structures like caches where reads are frequent.

Channels vs Mutex — Which Should You Use?

Both solve concurrency problems, but their use cases differ.

SituationRecommended
Passing data between goroutinesChannels
Task pipelines, event streamsChannels
Synchronizing shared state (counters, caches)Mutex
Simple completion signalsChannels or WaitGroup

The official Go wiki recommends “transfer ownership via channels.” If data flows from one goroutine to another, channels are natural; if multiple goroutines need to access the same memory, Mutex is the right choice. When in doubt, use “Is ownership being transferred?” as your deciding criterion.


Concurrency patterns ultimately boil down to two questions: “How do we distribute work?” and “When do we stop?” Worker Pool and Fan-Out/Fan-In answer the first question, and context answers the second. Race conditions must always be guarded against, and Mutex is the last line of defense for shared state problems that channels can’t solve.

The next part covers the package and module system. We’ll explore how to structure code and manage external dependencies.

-> Part 11: Packages and Modules


Related Posts

Share this post on:

Comments

Loading comments...


Previous Post
Go Part 9 — Goroutines and Channels
Next Post
Go Part 11 — Packages and Modules