Go上級 #1 並行性パターン — fan-out・fan-in・pipeline

Go 上級シリーズの最初の記事です。中級でゴルーチン/チャネル/select/context まで見たなら — 今回は それらの道具を組み立てて作る標準パターン

全7編で構成します。

  1. 並行性パターン(この記事) — pipeline、fan-out/fan-in
  2. メモリモデルと sync パッケージ — Mutex、atomic、Once
  3. ジェネリクス — type parameter、constraint
  4. reflect パッケージ — 動的に型を扱う
  5. unsafe と cgo — 境界の外へ一歩
  6. プロファイリング — pprof、benchmark、race
  7. コード生成 — go generate

ほとんどの並行コードは、この記事のパターンをいくつか知っているだけで十分書けます。

Pipeline — チャネルをつなぐ #

Go の最も基本的な並行性の形。段階ごとにゴルーチンがあり、チャネルでデータが流れていきます。

3 段階 pipeline
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	for v := range square(gen(1, 2, 3, 4)) {
		fmt.Println(v)    // 1, 4, 9, 16
	}
}

各段階は — チャネルを入力に取り、チャネルを返す関数。合わせると unix pipe のような流れができます。

要点二つ:

  • 段階ごとに 自分のチャネルを close — 次の段階の for range が自然に終了
  • 各段階が独立したゴルーチン — 一つの段階が遅くても前の段階は可能な分だけ進む

Fan-out — 一つの入力を複数のワーカーへ #

一つのチャネルの仕事を複数のゴルーチンが分担して処理します。

fan-out
func worker(id int, in <-chan int, out chan<- int) {
	for n := range in {
		// 重い作業
		out <- n * n
	}
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)
	out := make(chan int)

	// worker 3つが同じ in チャネルを共有
	for i := 0; i < 3; i++ {
		go worker(i, in, out)
	}

	// out をクローズ — 別途同期が必要 (次の節)
}

複数の worker が同じ入力チャネルから受け取るので — Go がうまく分配します。誰がより多く受け取るかは保証されませんが、一つの作業は一つの worker にだけ行きます(チャネルの receive は単一)。

落とし穴 — out チャネルは誰が閉じるのか? #

複数の worker が out に送っているとき — 誰も勝手に close できません。閉じたチャネルに送ると panic。

解決は — 全員が終わったかを追跡する分離した段階 が必要です。次の節の fan-in で自然に登場します。

Fan-in — 複数の入力を一つのチャネルにまとめる #

複数のチャネルの結果を一つのチャネルに合わせます。

fan-in (WaitGroup で close 同期)
import "sync"

func merge(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}

	wg.Add(len(channels))
	for _, c := range channels {
		go output(c)
	}

	// すべての入力が終わったら out をクローズ
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

一行ずつ見ると:

  • 各入力チャネルごとにゴルーチン 一つが — out に流す
  • wg.Wait() が終わった直後に close(out) — 別のゴルーチンで処理

この二つのパターン(fan-out + fan-in)を合わせると — worker pool の標準的な形です。

Worker pool — fan-out + fan-in を合わせる #

worker pool フルセット
func process(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)

	// fan-out — ワーカー 3つ
	c1 := process(in)
	c2 := process(in)
	c3 := process(in)

	// fan-in — 三つを一つに
	for v := range merge(c1, c2, c3) {
		fmt.Println(v)
	}
}

これが最もよくある Go 並行コードの形。仕事を分配して、結果を集める。

キャンセルを常に一緒に — done パターン #

上のコードは入力が終われば自然に終了しますが — 途中でキャンセル するには道具が必要です。

done チャネルを受け取る段階
func square(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-done:
				return
			}
		}
	}()
	return out
}

各段階が done チャネルを受け取ってselect の中で一緒に聞く形。done が閉じれば即座に return → out も close。

このパターンが 中級 #5 context と同じ形です。実践では普通 done の代わりに context.Context を受け取ります。

context で書き直す
func square(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out
}

Semaphore — 同時実行数の制限 #

ワーカー N 個を作りたくなく、一度に何個までだけ実行 したいとき。

チャネルを semaphore に
func main() {
	urls := []string{"a", "b", "c", "d", "e"}
	sem := make(chan struct{}, 3)    // 最大 3 つ

	var wg sync.WaitGroup
	for _, url := range urls {
		wg.Add(1)
		go func(u string) {
			defer wg.Done()

			sem <- struct{}{}            // acquire
			defer func() { <-sem }()      // release

			fetch(u)
		}(url)
	}
	wg.Wait()
}

容量 3 のバッファチャネルを — 一種の セマフォ として活用します。4 番目のゴルーチンは sem <- struct{}{} でブロックされて、誰かが release すれば進みます。

セマフォが本当に必要なら golang.org/x/sync/semaphore パッケージもありますが、上のようにチャネルだけで十分な場面が多いです。

Or-channel — どれか一つでも閉じれば終了 #

複数の done チャネルのうち どれか一つでも 閉じれば終了したいとき。

or-channel
func or(channels ...<-chan struct{}) <-chan struct{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	out := make(chan struct{})
	go func() {
		defer close(out)
		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-or(append(channels[3:], out)...):
			}
		}
	}()
	return out
}

再帰で無限個のチャネルを合わせるトリック。よく使われはしませんが — 知っておくと便利な道具箱。

Tee — 一つの入力を二つに複製 #

Linux の tee のように一つのチャネルを二つに分岐。

tee
func tee(in <-chan int) (<-chan int, <-chan int) {
	out1 := make(chan int)
	out2 := make(chan int)

	go func() {
		defer close(out1)
		defer close(out2)
		for v := range in {
			a, b := out1, out2
			for i := 0; i < 2; i++ {
				select {
				case a <- v:
					a = nil
				case b <- v:
					b = nil
				}
			}
		}
	}()
	return out1, out2
}

ab を一つずつ nil にする — 中級 #4 の nil チャネルトリック。両方が受け取るまで待ちます。

パターン — Bridge channel #

チャネルのチャネル(<-chan <-chan T)のような多段階を平坦化。

bridge
func bridge(chanStream <-chan <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			var stream <-chan int
			select {
			case maybeStream, ok := <-chanStream:
				if !ok {
					return
				}
				stream = maybeStream
			}
			for v := range stream {
				select {
				case out <- v:
				}
			}
		}
	}()
	return out
}

使うことは多くありませんが — 動的に異なるデータソースをつないで受け取るときに便利です。

どのパターンが合うか? #

状況パターン
段階が明確な変換(parse → validate → save)pipeline
同じ仕事を N 個同時実行fan-out + fan-in
同時実行数だけ制限semaphore
複数のソースのうちどれか一つでも終わればor-channel
一つのソースを二箇所で消費tee

ほとんどのコードは pipeline と fan-out/fan-in だけで解けます。他は道具箱に置いて必要なときに取り出す程度。

よく出会う落とし穴 #

1) 閉じるべきチャネルが閉じない → リーク #

for range が永遠に待つ。すべての送信者が close しなければ次の段階が終了できません。

2) 閉じたチャネルに send → panic #

複数の worker が同じ out に send するときによく発生。解決は WaitGroup + 一箇所だけで close

3) ctx 無視 #

select { case out <- v: }case <-ctx.Done(): を抜かすと — キャンセル信号が来ても最後まで進もうとしてリーク。すべてのチャネル送受信に一緒に聞く。

実践例 — URL 同時ダウンロード #

同時ダウンロード + 結果収集
type Result struct {
	URL  string
	Body []byte
	Err  error
}

func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
	// fan-out — concurrency 分だけ同時実行
	in := make(chan string)
	out := make(chan Result)

	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for url := range in {
				body, err := fetch(ctx, url)
				select {
				case out <- Result{url, body, err}:
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	// 仕事を送る
	go func() {
		defer close(in)
		for _, u := range urls {
			select {
			case in <- u:
			case <-ctx.Done():
				return
			}
		}
	}()

	// 結果 close 同期
	go func() {
		wg.Wait()
		close(out)
	}()

	var results []Result
	for r := range out {
		results = append(results, r)
	}
	return results
}

この一つの関数に — fan-out、semaphore(concurrency がその役割)、context キャンセル、fan-in まですべて入っています。実践でよく出会う形。

まとめ #

今回の記事で整理した内容:

  • pipeline — 段階ごとにチャネル、自分のチャネルを close
  • fan-out — 一つのチャネルを複数のワーカーが共有
  • fan-in (merge) — 複数のチャネルを一つに、WaitGroup で close 同期
  • worker pool = fan-out + fan-in
  • semaphore — バッファチャネルで同時実行数を制限
  • or-channel / tee / bridge — 道具箱
  • 常に ctx/done を一緒に聞く — リーク防止
  • 閉じたチャネルに send は panic — 一箇所だけで close

次の記事(#2 メモリモデルと sync パッケージ)では — チャネルの代わりに共有メモリを使うときに必要な道具。Mutex、atomic、Once がどう動作してどんな場面に合うかを整理します。

X