Go上級 #1 並行性パターン — fan-out・fan-in・pipeline
Go 上級シリーズの最初の記事です。中級でゴルーチン/チャネル/select/context まで見たなら — 今回は それらの道具を組み立てて作る標準パターン。
全7編で構成します。
- 並行性パターン(この記事) — pipeline、fan-out/fan-in
- メモリモデルと sync パッケージ — Mutex、atomic、Once
- ジェネリクス — type parameter、constraint
- reflect パッケージ — 動的に型を扱う
- unsafe と cgo — 境界の外へ一歩
- プロファイリング — pprof、benchmark、race
- コード生成 — go generate
ほとんどの並行コードは、この記事のパターンをいくつか知っているだけで十分書けます。
Pipeline — チャネルをつなぐ #
Go の最も基本的な並行性の形。段階ごとにゴルーチンがあり、チャネルでデータが流れていきます。
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
for v := range square(gen(1, 2, 3, 4)) {
fmt.Println(v) // 1, 4, 9, 16
}
}各段階は — チャネルを入力に取り、チャネルを返す関数。合わせると unix pipe のような流れができます。
要点二つ:
- 段階ごとに 自分のチャネルを close — 次の段階の
for rangeが自然に終了 - 各段階が独立したゴルーチン — 一つの段階が遅くても前の段階は可能な分だけ進む
Fan-out — 一つの入力を複数のワーカーへ #
一つのチャネルの仕事を複数のゴルーチンが分担して処理します。
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
// 重い作業
out <- n * n
}
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
out := make(chan int)
// worker 3つが同じ in チャネルを共有
for i := 0; i < 3; i++ {
go worker(i, in, out)
}
// out をクローズ — 別途同期が必要 (次の節)
}複数の worker が同じ入力チャネルから受け取るので — Go がうまく分配します。誰がより多く受け取るかは保証されませんが、一つの作業は一つの worker にだけ行きます(チャネルの receive は単一)。
落とし穴 — out チャネルは誰が閉じるのか? #
複数の worker が out に送っているとき — 誰も勝手に close できません。閉じたチャネルに送ると panic。
解決は — 全員が終わったかを追跡する分離した段階 が必要です。次の節の fan-in で自然に登場します。
Fan-in — 複数の入力を一つのチャネルにまとめる #
複数のチャネルの結果を一つのチャネルに合わせます。
import "sync"
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// すべての入力が終わったら out をクローズ
go func() {
wg.Wait()
close(out)
}()
return out
}一行ずつ見ると:
- 各入力チャネルごとにゴルーチン 一つが —
outに流す wg.Wait()が終わった直後に close(out) — 別のゴルーチンで処理
この二つのパターン(fan-out + fan-in)を合わせると — worker pool の標準的な形です。
Worker pool — fan-out + fan-in を合わせる #
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
// fan-out — ワーカー 3つ
c1 := process(in)
c2 := process(in)
c3 := process(in)
// fan-in — 三つを一つに
for v := range merge(c1, c2, c3) {
fmt.Println(v)
}
}これが最もよくある Go 並行コードの形。仕事を分配して、結果を集める。
キャンセルを常に一緒に — done パターン #
上のコードは入力が終われば自然に終了しますが — 途中でキャンセル するには道具が必要です。
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}各段階が done チャネルを受け取って — select の中で一緒に聞く形。done が閉じれば即座に return → out も close。
このパターンが 中級 #5 context と同じ形です。実践では普通 done の代わりに context.Context を受け取ります。
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}Semaphore — 同時実行数の制限 #
ワーカー N 個を作りたくなく、一度に何個までだけ実行 したいとき。
func main() {
urls := []string{"a", "b", "c", "d", "e"}
sem := make(chan struct{}, 3) // 最大 3 つ
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
sem <- struct{}{} // acquire
defer func() { <-sem }() // release
fetch(u)
}(url)
}
wg.Wait()
}容量 3 のバッファチャネルを — 一種の セマフォ として活用します。4 番目のゴルーチンは sem <- struct{}{} でブロックされて、誰かが release すれば進みます。
セマフォが本当に必要なら golang.org/x/sync/semaphore パッケージもありますが、上のようにチャネルだけで十分な場面が多いです。
Or-channel — どれか一つでも閉じれば終了 #
複数の done チャネルのうち どれか一つでも 閉じれば終了したいとき。
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
out := make(chan struct{})
go func() {
defer close(out)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], out)...):
}
}
}()
return out
}再帰で無限個のチャネルを合わせるトリック。よく使われはしませんが — 知っておくと便利な道具箱。
Tee — 一つの入力を二つに複製 #
Linux の tee のように一つのチャネルを二つに分岐。
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
a, b := out1, out2
for i := 0; i < 2; i++ {
select {
case a <- v:
a = nil
case b <- v:
b = nil
}
}
}
}()
return out1, out2
}a と b を一つずつ nil にする — 中級 #4 の nil チャネルトリック。両方が受け取るまで待ちます。
パターン — Bridge channel #
チャネルのチャネル(<-chan <-chan T)のような多段階を平坦化。
func bridge(chanStream <-chan <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
var stream <-chan int
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
}
for v := range stream {
select {
case out <- v:
}
}
}
}()
return out
}使うことは多くありませんが — 動的に異なるデータソースをつないで受け取るときに便利です。
どのパターンが合うか? #
| 状況 | パターン |
|---|---|
| 段階が明確な変換(parse → validate → save) | pipeline |
| 同じ仕事を N 個同時実行 | fan-out + fan-in |
| 同時実行数だけ制限 | semaphore |
| 複数のソースのうちどれか一つでも終われば | or-channel |
| 一つのソースを二箇所で消費 | tee |
ほとんどのコードは pipeline と fan-out/fan-in だけで解けます。他は道具箱に置いて必要なときに取り出す程度。
よく出会う落とし穴 #
1) 閉じるべきチャネルが閉じない → リーク #
for range が永遠に待つ。すべての送信者が close しなければ次の段階が終了できません。
2) 閉じたチャネルに send → panic #
複数の worker が同じ out に send するときによく発生。解決は WaitGroup + 一箇所だけで close。
3) ctx 無視 #
select { case out <- v: } に case <-ctx.Done(): を抜かすと — キャンセル信号が来ても最後まで進もうとしてリーク。すべてのチャネル送受信に一緒に聞く。
実践例 — URL 同時ダウンロード #
type Result struct {
URL string
Body []byte
Err error
}
func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
// fan-out — concurrency 分だけ同時実行
in := make(chan string)
out := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range in {
body, err := fetch(ctx, url)
select {
case out <- Result{url, body, err}:
case <-ctx.Done():
return
}
}
}()
}
// 仕事を送る
go func() {
defer close(in)
for _, u := range urls {
select {
case in <- u:
case <-ctx.Done():
return
}
}
}()
// 結果 close 同期
go func() {
wg.Wait()
close(out)
}()
var results []Result
for r := range out {
results = append(results, r)
}
return results
}この一つの関数に — fan-out、semaphore(concurrency がその役割)、context キャンセル、fan-in まですべて入っています。実践でよく出会う形。
まとめ #
今回の記事で整理した内容:
- pipeline — 段階ごとにチャネル、自分のチャネルを close
- fan-out — 一つのチャネルを複数のワーカーが共有
- fan-in (merge) — 複数のチャネルを一つに、WaitGroup で close 同期
- worker pool = fan-out + fan-in
- semaphore — バッファチャネルで同時実行数を制限
- or-channel / tee / bridge — 道具箱
- 常に ctx/done を一緒に聞く — リーク防止
- 閉じたチャネルに send は panic — 一箇所だけで close
次の記事(#2 メモリモデルと sync パッケージ)では — チャネルの代わりに共有メモリを使うときに必要な道具。Mutex、atomic、Once がどう動作してどんな場面に合うかを整理します。