고 고급 #1 동시성 패턴 — fan-out,fan-in,pipeline

6 분 소요

Go 고급 시리즈의 첫 글입니다. 중급에서 고루틴/채널/select/context까지 봤다면 — 이번에는 그 도구들을 조립해서 만드는 표준 패턴들.

총 7편으로 구성합니다.

  1. 동시성 패턴 (이 글) — pipeline, fan-out/fan-in
  2. 메모리 모델과 sync 패키지 — Mutex, atomic, Once
  3. 제네릭 — type parameter, constraint
  4. reflect 패키지 — 동적 타입 다루기
  5. unsafe와 cgo — 경계 밖으로 한 발
  6. 프로파일링 — pprof, benchmark, race
  7. 코드 생성 — go generate

대부분의 동시성 코드는 이 글의 패턴 몇 개만 알면 충분히 짤 수 있습니다.

Pipeline — 채널을 이어 붙이기 #

Go의 가장 기본적인 동시성 형태. 단계마다 고루틴이 있고, 채널로 데이터가 흘러갑니다.

3 단계 pipeline
func gen(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			out <- n
		}
	}()
	return out
}

func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	for v := range square(gen(1, 2, 3, 4)) {
		fmt.Println(v)    // 1, 4, 9, 16
	}
}

각 단계는 — 채널을 입력받고 채널을 돌려주는 함수. 합치면 unix pipe처럼 흐름이 만들어집니다.

요점 두 가지:

  • 단계마다 자기 채널을 close — 다음 단계의 for range가 자연스럽게 종료
  • 각 단계가 독립적인 고루틴 — 한 단계가 느려도 앞 단계는 가능한 만큼 진행

Fan-out — 한 입력을 여러 워커로 #

한 채널의 일을 여러 고루틴이 나눠 처리합니다.

fan-out
func worker(id int, in <-chan int, out chan<- int) {
	for n := range in {
		// 무거운 작업
		out <- n * n
	}
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)
	out := make(chan int)

	// worker 3개가 같은 in 채널을 공유
	for i := 0; i < 3; i++ {
		go worker(i, in, out)
	}

	// out 닫기 — 별도 동기화 필요 (다음 절)
}

여러 worker가 같은 입력 채널에서 받기 때문에 — Go가 알아서 분배합니다. 누가 더 많이 받을지는 보장 안 되지만, 하나의 작업이 한 worker에만 갑니다(채널의 receive는 단일).

함정 — out 채널은 누가 닫나? #

여러 worker가 out에 보내고 있을 때 — 누구도 마음대로 close 할 수 없습니다. 닫힌 채널에 보내면 panic.

해결은 — 모두가 끝났는지를 추적하는 분리된 단계가 필요합니다. 다음 절의 fan-in에서 자연스럽게 다룹니다.

Fan-in — 여러 입력을 한 채널로 모음 #

여러 채널의 결과를 한 채널로 합칩니다.

fan-in (WaitGroup으로 close 동기화)
import "sync"

func merge(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	output := func(c <-chan int) {
		defer wg.Done()
		for n := range c {
			out <- n
		}
	}

	wg.Add(len(channels))
	for _, c := range channels {
		go output(c)
	}

	// 모든 입력이 끝나면 out 닫기
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

한 줄씩 보면:

  • 각 입력 채널마다 고루틴 하나가 — out으로 흘려보냄
  • wg.Wait()가 끝난 직후 close(out) — 별도 고루틴에서 처리

이 두 패턴(fan-out + fan-in)을 합치면 — worker pool의 표준 모양입니다.

Worker pool — fan-out + fan-in 합치기 #

worker pool 풀세트
func process(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			out <- n * n
		}
	}()
	return out
}

func main() {
	in := gen(1, 2, 3, 4, 5, 6, 7, 8)

	// fan-out — 워커 3개
	c1 := process(in)
	c2 := process(in)
	c3 := process(in)

	// fan-in — 셋을 하나로
	for v := range merge(c1, c2, c3) {
		fmt.Println(v)
	}
}

이게 가장 흔한 Go 동시성 코드의 형태입니다. 일감을 분배하고, 결과를 모읍니다.

취소를 항상 같이 — done 패턴 #

위 코드는 입력이 끝나면 자연스럽게 종료하지만 — 중간에 취소를 하려면 도구가 필요합니다.

done 채널을 받는 단계
func square(done <-chan struct{}, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-done:
				return
			}
		}
	}()
	return out
}

각 단계가 done 채널을 받아서select 안에서 같이 듣습니다. done이 닫히면 즉시 return → out도 close.

이 패턴이 중급 #5 context와 같은 모양입니다. 실전에서는 보통 done 대신 context.Context를 받습니다.

context로 다시 쓰기
func square(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case out <- n * n:
			case <-ctx.Done():
				return
			}
		}
	}()
	return out
}

Semaphore — 동시 실행 수 제한 #

워커 N 개를 만들고 싶지 않고, 한 번에 몇 개까지만 실행하고 싶을 때 사용합니다.

채널을 semaphore로
func main() {
	urls := []string{"a", "b", "c", "d", "e"}
	sem := make(chan struct{}, 3)    // 최대 3개

	var wg sync.WaitGroup
	for _, url := range urls {
		wg.Add(1)
		go func(u string) {
			defer wg.Done()

			sem <- struct{}{}            // acquire
			defer func() { <-sem }()      // release

			fetch(u)
		}(url)
	}
	wg.Wait()
}

용량 3 인 버퍼 채널을 — 일종의 세마포어로 활용합니다. 4 번째 고루틴은 sem <- struct{}{}에서 블록되다가 누군가 release 하면 진행.

세마포어가 정말 필요하면 golang.org/x/sync/semaphore 패키지도 있는데, 위처럼 채널만으로 충분한 경우가 많습니다.

Or-channel — 어느 하나라도 닫히면 종료 #

여러 done 채널 중 하나라도 닫히면 종료하고 싶을 때 사용합니다.

or-channel
func or(channels ...<-chan struct{}) <-chan struct{} {
	switch len(channels) {
	case 0:
		return nil
	case 1:
		return channels[0]
	}

	out := make(chan struct{})
	go func() {
		defer close(out)
		switch len(channels) {
		case 2:
			select {
			case <-channels[0]:
			case <-channels[1]:
			}
		default:
			select {
			case <-channels[0]:
			case <-channels[1]:
			case <-channels[2]:
			case <-or(append(channels[3:], out)...):
			}
		}
	}()
	return out
}

재귀로 무한 개의 채널을 합치는 트릭입니다. 자주 쓰이지는 않지만 — 알아 두면 유용합니다.

Tee — 한 입력을 둘로 복제 #

리눅스 tee처럼 한 채널을 둘로 분기.

tee
func tee(in <-chan int) (<-chan int, <-chan int) {
	out1 := make(chan int)
	out2 := make(chan int)

	go func() {
		defer close(out1)
		defer close(out2)
		for v := range in {
			a, b := out1, out2
			for i := 0; i < 2; i++ {
				select {
				case a <- v:
					a = nil
				case b <- v:
					b = nil
				}
			}
		}
	}()
	return out1, out2
}

ab를 하나씩 nil로 만드는 — 중급 #4의 nil 채널 트릭. 양쪽이 모두 받아갈 때까지 기다립니다.

패턴 — Bridge channel #

채널의 채널 (<-chan <-chan T) 같은 다중 단계를 평탄화.

bridge
func bridge(chanStream <-chan <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for {
			var stream <-chan int
			select {
			case maybeStream, ok := <-chanStream:
				if !ok {
					return
				}
				stream = maybeStream
			}
			for v := range stream {
				select {
				case out <- v:
				}
			}
		}
	}()
	return out
}

쓸 일이 자주는 없지만 — 동적으로 다른 데이터 소스를 이어 받을 때 유용합니다.

어느 패턴이 어울리나? #

상황패턴
단계가 명확한 변환 (parse → validate → save)pipeline
같은 일을 N 개 동시 실행fan-out + fan-in
동시 실행 수만 제한semaphore
여러 source 중 하나라도 끝나면or-channel
한 source를 두 곳에서 소비tee

대부분의 코드는 pipeline과 fan-out/fan-in 만으로 풀 수 있습니다. 다른 건 도구상자에 두고 필요할 때 꺼내는 정도.

자주 만나는 함정 #

1) 닫혀야 할 채널이 안 닫힘 → 누수 #

for range가 영원히 기다립니다. 모든 송신자가 close 하지 않으면 다음 단계가 종료를 못 합니다.

2) 닫힌 채널에 send → panic #

여러 worker가 같은 out에 send 할 때 자주 발생. 해결은 WaitGroup + 한 곳에서만 close.

3) ctx 무시 #

select { case out <- v: }case <-ctx.Done():을 빠뜨리면 — 취소 신호가 와도 끝까지 진행하려다 누수. 모든 채널 송수신에 같이 듣기.

실전 예시 — URL 동시 다운로드 #

동시 다운로드 + 결과 모음
type Result struct {
	URL  string
	Body []byte
	Err  error
}

func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
	// fan-out — concurrency만큼만 동시 실행
	in := make(chan string)
	out := make(chan Result)

	var wg sync.WaitGroup
	for i := 0; i < concurrency; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			for url := range in {
				body, err := fetch(ctx, url)
				select {
				case out <- Result{url, body, err}:
				case <-ctx.Done():
					return
				}
			}
		}()
	}

	// 일감 보내기
	go func() {
		defer close(in)
		for _, u := range urls {
			select {
			case in <- u:
			case <-ctx.Done():
				return
			}
		}
	}()

	// 결과 close 동기화
	go func() {
		wg.Wait()
		close(out)
	}()

	var results []Result
	for r := range out {
		results = append(results, r)
	}
	return results
}

이 한 함수에 — fan-out, semaphore (concurrency가 그 역할), context 취소, fan-in까지 다 들어 있습니다. 실전에서 자주 보이는 형태입니다.

마무리 #

이번 글에서 정리한 내용:

  • pipeline — 단계마다 채널, 자기 채널을 close
  • fan-out — 한 채널을 여러 워커가 공유
  • fan-in (merge) — 여러 채널을 하나로, WaitGroup으로 close 동기화
  • worker pool = fan-out + fan-in
  • semaphore — 버퍼 채널로 동시 실행 수 제한
  • or-channel / tee / bridge — 도구상자
  • 항상 ctx/done같이 듣기 — 누수 방지
  • 닫힌 채널에 send는 panic — 한 곳에서만 close

다음 글(#2 메모리 모델과 sync 패키지)에서는 — 채널 대신 공유 메모리를 쓸 때 필요한 도구들. Mutex, atomic, Once가 어떻게 동작하고 언제 어울리는지 정리합니다.

X