Go Advanced #1 Concurrency Patterns — fan-out, fan-in, pipeline

8 min read

The first post in the Go Advanced series. After seeing goroutines/channels/select/context in Intermediate — this time we cover the standard patterns built by composing those tools.

A 7-post series.

  1. Concurrency patterns (this post) — pipeline, fan-out/fan-in
  2. Memory model and the sync package — Mutex, atomic, Once
  3. Generics — type parameters, constraints
  4. The reflect package — handling dynamic types
  5. unsafe and cgo — one step beyond the boundary
  6. Profiling — pprof, benchmark, race
  7. Code generation — go generate

Most concurrency code can be written with just a few of the patterns from this post.

Pipeline — chaining channels #

The most basic shape of Go concurrency. A goroutine per stage, with data flowing through channels.

3-stage pipeline
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	for v := range square(gen(1, 2, 3, 4)) {
		fmt.Println(v)    // 1, 4, 9, 16
	}
}

Each stage is a function that takes a channel and returns a channel. Compose them and you get a flow shaped like a Unix pipe.

Two key points:

  • Each stage closes its own channel — so the next stage’s for range ends naturally
  • Each stage is an independent goroutine — even if one stage is slow, earlier stages progress as far as they can

Fan-out — one input to many workers #

A channel’s work is divided among multiple goroutines.

fan-out
func worker(id int, in <-chan int, out chan<- int) {
	for n := range in {
		// heavy work
		out <- n * n
	}
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)
	out := make(chan int)

	// 3 workers share the same in channel
	for i := 0; i < 3; i++ {
		go worker(i, in, out)
	}

	// closing out — needs separate synchronization (next section)
}

Multiple workers receive from the same input channel — Go distributes work among them automatically. Which worker receives more is not guaranteed, but each piece of work goes to exactly one worker (a channel receive is exclusive).

Pitfall — who closes the out channel? #

When multiple workers send on out — no single worker can simply close it. Sending on a closed channel is a panic.

The solution is a separate stage that tracks when all workers are done — which comes up naturally in fan-in, covered next.

Fan-in — gathering many inputs into one channel #

Combine results from many channels into one channel.

fan-in (close synchronized via WaitGroup)
import "sync"

func merge(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}

	wg.Add(len(channels))
	for _, c := range channels {
		go output(c)
	}

	// close out once every input is done
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

Line by line:

  • One goroutine per input channel — forwards to out
  • Close(out) right after wg.Wait() — handled in a separate goroutine

Combining these two patterns (fan-out + fan-in) gives the standard worker pool shape.

Worker pool — fan-out + fan-in combined #

full worker pool
func process(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)

	// fan-out — 3 workers
	c1 := process(in)
	c2 := process(in)
	c3 := process(in)

	// fan-in — 3 into 1
	for v := range merge(c1, c2, c3) {
		fmt.Println(v)
	}
}

This is the most common shape for Go concurrency code. Distribute jobs, gather results.

Always pair with cancellation — done pattern #

The code above terminates naturally when the input ends — but mid-flight cancellation needs a tool.

stage that takes a done channel
func square(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-done:
				return
			}
		}
	}()
	return out
}

Each stage takes a done channel — and listens for it inside select. When done closes, return immediately → out is also closed.

This pattern is the same as Intermediate #5 context. In practice you usually take a context.Context instead of done.

rewritten with context
func square(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out
}

Semaphore — limit concurrent execution #

When you don’t want to spawn N workers but limit how many run at once.

channel as semaphore
func main() {
	urls := []string{"a", "b", "c", "d", "e"}
	sem := make(chan struct{}, 3)    // up to 3

	var wg sync.WaitGroup
	for _, url := range urls {
		wg.Add(1)
		go func(u string) {
			defer wg.Done()

			sem <- struct{}{}            // acquire
			defer func() { <-sem }()      // release

			fetch(u)
		}(url)
	}
	wg.Wait()
}

A buffered channel with capacity 3 — used as a kind of semaphore. The 4th goroutine blocks at sem <- struct{}{} until someone releases.

If you really need a semaphore, the golang.org/x/sync/semaphore package exists, but channels often suffice as above.

Or-channel — terminate when any one closes #

When you want to terminate as soon as any of multiple done channels closes.

or-channel
func or(channels ...<-chan struct{}) <-chan struct{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	out := make(chan struct{})
	go func() {
		defer close(out)
		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-or(append(channels[3:], out)...):
			}
		}
	}()
	return out
}

A recursive trick that merges any number of channels. Not used often, but a useful pattern to know.

Tee — duplicate one input into two #

Like the Linux tee, splitting one channel into two.

tee
func tee(in <-chan int) (<-chan int, <-chan int) {
	out1 := make(chan int)
	out2 := make(chan int)

	go func() {
		defer close(out1)
		defer close(out2)
		for v := range in {
			a, b := out1, out2
			for i := 0; i < 2; i++ {
				select {
				case a <- v:
					a = nil
				case b <- v:
					b = nil
				}
			}
		}
	}()
	return out1, out2
}

Setting a and b to nil one at a time — the nil-channel trick from Intermediate #4. Waits until both have received.

Pattern — Bridge channel #

Flatten multiple stages of channels-of-channels (<-chan <-chan T).

bridge
func bridge(chanStream <-chan <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			var stream <-chan int
			select {
			case maybeStream, ok := <-chanStream:
				if !ok {
					return
				}
				stream = maybeStream
			}
			for v := range stream {
				select {
				case out <- v:
				}
			}
		}
	}()
	return out
}

Not used often, but useful when you need to consume different data sources dynamically, one at a time.

Which pattern fits? #

SituationPattern
Clear-staged transformation (parse → validate → save)pipeline
Run the same work N times concurrentlyfan-out + fan-in
Limit concurrency onlysemaphore
Terminate when any one of many sources finishesor-channel
Consume one source from two placestee

Most code can be solved with just pipeline and fan-out/fan-in. Keep the rest in the toolbox for when needed.

Common pitfalls #

1) A channel that should close doesn’t → leak #

for range waits forever. If not all senders close, the next stage can’t terminate.

2) Sending to a closed channel → panic #

Often happens when multiple workers send on the same out. Solution — WaitGroup + close from one place.

3) Ignoring ctx #

Skipping case <-ctx.Done(): in select { case out <- v: } — even if a cancellation arrives, the work runs to completion and leaks. Listen on every send/receive.

Practical example — concurrent URL download #

concurrent download + result collection
type Result struct {
	URL  string
	Body []byte
	Err  error
}

func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
	// fan-out — only `concurrency` running at once
	in := make(chan string)
	out := make(chan Result)

	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for url := range in {
				body, err := fetch(ctx, url)
				select {
				case out <- Result{url, body, err}:
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	// send work
	go func() {
		defer close(in)
		for _, u := range urls {
			select {
			case in <- u:
			case <-ctx.Done():
				return
			}
		}
	}()

	// synchronize close of results
	go func() {
		wg.Wait()
		close(out)
	}()

	var results []Result
	for r := range out {
		results = append(results, r)
	}
	return results
}

In this single function — fan-out, a semaphore (the concurrency parameter plays that role), context cancellation, and fan-in — all in one. A shape you’ll encounter in real-world code.

Wrap-up #

What we covered:

  • pipeline — channel per stage, close your own channel
  • fan-out — many workers share one channel
  • fan-in (merge) — many channels into one, close synchronized via WaitGroup
  • worker pool = fan-out + fan-in
  • semaphore — limit concurrency with a buffered channel
  • or-channel / tee / bridge — toolbox
  • Always listen to ctx/done — prevent leaks
  • Sending to a closed channel panics — close from one place only

In the next post (#2 Memory Model and the sync Package) — the tools you need when using shared memory instead of channels. How Mutex, atomic, and Once work and when each fits.

X