Go Advanced #1 Concurrency Patterns — fan-out, fan-in, pipeline
The first post in the Go Advanced series. After seeing goroutines/channels/select/context in Intermediate — this time we cover the standard patterns built by composing those tools.
A 7-post series.
- Concurrency patterns (this post) — pipeline, fan-out/fan-in
- Memory model and the sync package — Mutex, atomic, Once
- Generics — type parameters, constraints
- The reflect package — handling dynamic types
- unsafe and cgo — one step beyond the boundary
- Profiling — pprof, benchmark, race
- Code generation — go generate
Most concurrency code can be written with just a few of the patterns from this post.
Pipeline — chaining channels #
The most basic shape of Go concurrency. A goroutine per stage, with data flowing through channels.
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
for v := range square(gen(1, 2, 3, 4)) {
fmt.Println(v) // 1, 4, 9, 16
}
}Each stage is a function that takes a channel and returns a channel. Compose them and you get a flow shaped like a Unix pipe.
Two key points:
- Each stage closes its own channel — so the next stage’s
for rangeends naturally - Each stage is an independent goroutine — even if one stage is slow, earlier stages progress as far as they can
Fan-out — one input to many workers #
A channel’s work is divided among multiple goroutines.
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
// heavy work
out <- n * n
}
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
out := make(chan int)
// 3 workers share the same in channel
for i := 0; i < 3; i++ {
go worker(i, in, out)
}
// closing out — needs separate synchronization (next section)
}Multiple workers receive from the same input channel — Go distributes work among them automatically. Which worker receives more is not guaranteed, but each piece of work goes to exactly one worker (a channel receive is exclusive).
Pitfall — who closes the out channel? #
When multiple workers send on out — no single worker can simply close it. Sending on a closed channel is a panic.
The solution is a separate stage that tracks when all workers are done — which comes up naturally in fan-in, covered next.
Fan-in — gathering many inputs into one channel #
Combine results from many channels into one channel.
import "sync"
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// close out once every input is done
go func() {
wg.Wait()
close(out)
}()
return out
}Line by line:
- One goroutine per input channel — forwards to
out - Close(out) right after
wg.Wait()— handled in a separate goroutine
Combining these two patterns (fan-out + fan-in) gives the standard worker pool shape.
Worker pool — fan-out + fan-in combined #
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
// fan-out — 3 workers
c1 := process(in)
c2 := process(in)
c3 := process(in)
// fan-in — 3 into 1
for v := range merge(c1, c2, c3) {
fmt.Println(v)
}
}This is the most common shape for Go concurrency code. Distribute jobs, gather results.
Always pair with cancellation — done pattern #
The code above terminates naturally when the input ends — but mid-flight cancellation needs a tool.
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}Each stage takes a done channel — and listens for it inside select. When done closes, return immediately → out is also closed.
This pattern is the same as Intermediate #5 context. In practice you usually take a context.Context instead of done.
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}Semaphore — limit concurrent execution #
When you don’t want to spawn N workers but limit how many run at once.
func main() {
urls := []string{"a", "b", "c", "d", "e"}
sem := make(chan struct{}, 3) // up to 3
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
sem <- struct{}{} // acquire
defer func() { <-sem }() // release
fetch(u)
}(url)
}
wg.Wait()
}A buffered channel with capacity 3 — used as a kind of semaphore. The 4th goroutine blocks at sem <- struct{}{} until someone releases.
If you really need a semaphore, the golang.org/x/sync/semaphore package exists, but channels often suffice as above.
Or-channel — terminate when any one closes #
When you want to terminate as soon as any of multiple done channels closes.
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
out := make(chan struct{})
go func() {
defer close(out)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], out)...):
}
}
}()
return out
}A recursive trick that merges any number of channels. Not used often, but a useful pattern to know.
Tee — duplicate one input into two #
Like the Linux tee, splitting one channel into two.
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
a, b := out1, out2
for i := 0; i < 2; i++ {
select {
case a <- v:
a = nil
case b <- v:
b = nil
}
}
}
}()
return out1, out2
}Setting a and b to nil one at a time — the nil-channel trick from Intermediate #4. Waits until both have received.
Pattern — Bridge channel #
Flatten multiple stages of channels-of-channels (<-chan <-chan T).
func bridge(chanStream <-chan <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
var stream <-chan int
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
}
for v := range stream {
select {
case out <- v:
}
}
}
}()
return out
}Not used often, but useful when you need to consume different data sources dynamically, one at a time.
Which pattern fits? #
| Situation | Pattern |
|---|---|
| Clear-staged transformation (parse → validate → save) | pipeline |
| Run the same work N times concurrently | fan-out + fan-in |
| Limit concurrency only | semaphore |
| Terminate when any one of many sources finishes | or-channel |
| Consume one source from two places | tee |
Most code can be solved with just pipeline and fan-out/fan-in. Keep the rest in the toolbox for when needed.
Common pitfalls #
1) A channel that should close doesn’t → leak #
for range waits forever. If not all senders close, the next stage can’t terminate.
2) Sending to a closed channel → panic #
Often happens when multiple workers send on the same out. Solution — WaitGroup + close from one place.
3) Ignoring ctx #
Skipping case <-ctx.Done(): in select { case out <- v: } — even if a cancellation arrives, the work runs to completion and leaks. Listen on every send/receive.
Practical example — concurrent URL download #
type Result struct {
URL string
Body []byte
Err error
}
func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
// fan-out — only `concurrency` running at once
in := make(chan string)
out := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range in {
body, err := fetch(ctx, url)
select {
case out <- Result{url, body, err}:
case <-ctx.Done():
return
}
}
}()
}
// send work
go func() {
defer close(in)
for _, u := range urls {
select {
case in <- u:
case <-ctx.Done():
return
}
}
}()
// synchronize close of results
go func() {
wg.Wait()
close(out)
}()
var results []Result
for r := range out {
results = append(results, r)
}
return results
}In this single function — fan-out, a semaphore (the concurrency parameter plays that role), context cancellation, and fan-in — all in one. A shape you’ll encounter in real-world code.
Wrap-up #
What we covered:
- pipeline — channel per stage, close your own channel
- fan-out — many workers share one channel
- fan-in (merge) — many channels into one, close synchronized via WaitGroup
- worker pool = fan-out + fan-in
- semaphore — limit concurrency with a buffered channel
- or-channel / tee / bridge — toolbox
- Always listen to ctx/done — prevent leaks
- Sending to a closed channel panics — close from one place only
In the next post (#2 Memory Model and the sync Package) — the tools you need when using shared memory instead of channels. How Mutex, atomic, and Once work and when each fits.