고 고급 #1 동시성 패턴 — fan-out,fan-in,pipeline
Go 고급 시리즈의 첫 글입니다. 중급에서 고루틴/채널/select/context까지 봤다면 — 이번에는 그 도구들을 조립해서 만드는 표준 패턴들.
총 7편으로 구성합니다.
- 동시성 패턴 (이 글) — pipeline, fan-out/fan-in
- 메모리 모델과 sync 패키지 — Mutex, atomic, Once
- 제네릭 — type parameter, constraint
- reflect 패키지 — 동적 타입 다루기
- unsafe와 cgo — 경계 밖으로 한 발
- 프로파일링 — pprof, benchmark, race
- 코드 생성 — go generate
대부분의 동시성 코드는 이 글의 패턴 몇 개만 알면 충분히 짤 수 있습니다.
Pipeline — 채널을 이어 붙이기 #
Go의 가장 기본적인 동시성 형태. 단계마다 고루틴이 있고, 채널로 데이터가 흘러갑니다.
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for _, n := range nums {
out <- n
}
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
for v := range square(gen(1, 2, 3, 4)) {
fmt.Println(v) // 1, 4, 9, 16
}
}각 단계는 — 채널을 입력받고 채널을 돌려주는 함수. 합치면 unix pipe처럼 흐름이 만들어집니다.
요점 두 가지:
- 단계마다 자기 채널을 close — 다음 단계의
for range가 자연스럽게 종료 - 각 단계가 독립적인 고루틴 — 한 단계가 느려도 앞 단계는 가능한 만큼 진행
Fan-out — 한 입력을 여러 워커로 #
한 채널의 일을 여러 고루틴이 나눠 처리합니다.
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
// 무거운 작업
out <- n * n
}
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
out := make(chan int)
// worker 3개가 같은 in 채널을 공유
for i := 0; i < 3; i++ {
go worker(i, in, out)
}
// out 닫기 — 별도 동기화 필요 (다음 절)
}여러 worker가 같은 입력 채널에서 받기 때문에 — Go가 알아서 분배합니다. 누가 더 많이 받을지는 보장 안 되지만, 하나의 작업이 한 worker에만 갑니다(채널의 receive는 단일).
함정 — out 채널은 누가 닫나? #
여러 worker가 out에 보내고 있을 때 — 누구도 마음대로 close 할 수 없습니다. 닫힌 채널에 보내면 panic.
해결은 — 모두가 끝났는지를 추적하는 분리된 단계가 필요합니다. 다음 절의 fan-in에서 자연스럽게 다룹니다.
Fan-in — 여러 입력을 한 채널로 모음 #
여러 채널의 결과를 한 채널로 합칩니다.
import "sync"
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// 모든 입력이 끝나면 out 닫기
go func() {
wg.Wait()
close(out)
}()
return out
}한 줄씩 보면:
- 각 입력 채널마다 고루틴 하나가 —
out으로 흘려보냄 wg.Wait()가 끝난 직후 close(out) — 별도 고루틴에서 처리
이 두 패턴(fan-out + fan-in)을 합치면 — worker pool의 표준 모양입니다.
Worker pool — fan-out + fan-in 합치기 #
func process(in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
out <- n * n
}
}()
return out
}
func main() {
in := gen(1, 2, 3, 4, 5, 6, 7, 8)
// fan-out — 워커 3개
c1 := process(in)
c2 := process(in)
c3 := process(in)
// fan-in — 셋을 하나로
for v := range merge(c1, c2, c3) {
fmt.Println(v)
}
}이게 가장 흔한 Go 동시성 코드의 형태입니다. 일감을 분배하고, 결과를 모읍니다.
취소를 항상 같이 — done 패턴 #
위 코드는 입력이 끝나면 자연스럽게 종료하지만 — 중간에 취소를 하려면 도구가 필요합니다.
func square(done <-chan struct{}, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-done:
return
}
}
}()
return out
}각 단계가 done 채널을 받아서 — select 안에서 같이 듣습니다. done이 닫히면 즉시 return → out도 close.
이 패턴이 중급 #5 context와 같은 모양입니다. 실전에서는 보통 done 대신 context.Context를 받습니다.
func square(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n:
case <-ctx.Done():
return
}
}
}()
return out
}Semaphore — 동시 실행 수 제한 #
워커 N 개를 만들고 싶지 않고, 한 번에 몇 개까지만 실행하고 싶을 때 사용합니다.
func main() {
urls := []string{"a", "b", "c", "d", "e"}
sem := make(chan struct{}, 3) // 최대 3개
var wg sync.WaitGroup
for _, url := range urls {
wg.Add(1)
go func(u string) {
defer wg.Done()
sem <- struct{}{} // acquire
defer func() { <-sem }() // release
fetch(u)
}(url)
}
wg.Wait()
}용량 3 인 버퍼 채널을 — 일종의 세마포어로 활용합니다. 4 번째 고루틴은 sem <- struct{}{}에서 블록되다가 누군가 release 하면 진행.
세마포어가 정말 필요하면 golang.org/x/sync/semaphore 패키지도 있는데, 위처럼 채널만으로 충분한 경우가 많습니다.
Or-channel — 어느 하나라도 닫히면 종료 #
여러 done 채널 중 하나라도 닫히면 종료하고 싶을 때 사용합니다.
func or(channels ...<-chan struct{}) <-chan struct{} {
switch len(channels) {
case 0:
return nil
case 1:
return channels[0]
}
out := make(chan struct{})
go func() {
defer close(out)
switch len(channels) {
case 2:
select {
case <-channels[0]:
case <-channels[1]:
}
default:
select {
case <-channels[0]:
case <-channels[1]:
case <-channels[2]:
case <-or(append(channels[3:], out)...):
}
}
}()
return out
}재귀로 무한 개의 채널을 합치는 트릭입니다. 자주 쓰이지는 않지만 — 알아 두면 유용합니다.
Tee — 한 입력을 둘로 복제 #
리눅스 tee처럼 한 채널을 둘로 분기.
func tee(in <-chan int) (<-chan int, <-chan int) {
out1 := make(chan int)
out2 := make(chan int)
go func() {
defer close(out1)
defer close(out2)
for v := range in {
a, b := out1, out2
for i := 0; i < 2; i++ {
select {
case a <- v:
a = nil
case b <- v:
b = nil
}
}
}
}()
return out1, out2
}a와 b를 하나씩 nil로 만드는 — 중급 #4의 nil 채널 트릭. 양쪽이 모두 받아갈 때까지 기다립니다.
패턴 — Bridge channel #
채널의 채널 (<-chan <-chan T) 같은 다중 단계를 평탄화.
func bridge(chanStream <-chan <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
var stream <-chan int
select {
case maybeStream, ok := <-chanStream:
if !ok {
return
}
stream = maybeStream
}
for v := range stream {
select {
case out <- v:
}
}
}
}()
return out
}쓸 일이 자주는 없지만 — 동적으로 다른 데이터 소스를 이어 받을 때 유용합니다.
어느 패턴이 어울리나? #
| 상황 | 패턴 |
|---|---|
| 단계가 명확한 변환 (parse → validate → save) | pipeline |
| 같은 일을 N 개 동시 실행 | fan-out + fan-in |
| 동시 실행 수만 제한 | semaphore |
| 여러 source 중 하나라도 끝나면 | or-channel |
| 한 source를 두 곳에서 소비 | tee |
대부분의 코드는 pipeline과 fan-out/fan-in 만으로 풀 수 있습니다. 다른 건 도구상자에 두고 필요할 때 꺼내는 정도.
자주 만나는 함정 #
1) 닫혀야 할 채널이 안 닫힘 → 누수 #
for range가 영원히 기다립니다. 모든 송신자가 close 하지 않으면 다음 단계가 종료를 못 합니다.
2) 닫힌 채널에 send → panic #
여러 worker가 같은 out에 send 할 때 자주 발생. 해결은 WaitGroup + 한 곳에서만 close.
3) ctx 무시 #
select { case out <- v: }에 case <-ctx.Done():을 빠뜨리면 — 취소 신호가 와도 끝까지 진행하려다 누수. 모든 채널 송수신에 같이 듣기.
실전 예시 — URL 동시 다운로드 #
type Result struct {
URL string
Body []byte
Err error
}
func downloadAll(ctx context.Context, urls []string, concurrency int) []Result {
// fan-out — concurrency만큼만 동시 실행
in := make(chan string)
out := make(chan Result)
var wg sync.WaitGroup
for i := 0; i < concurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for url := range in {
body, err := fetch(ctx, url)
select {
case out <- Result{url, body, err}:
case <-ctx.Done():
return
}
}
}()
}
// 일감 보내기
go func() {
defer close(in)
for _, u := range urls {
select {
case in <- u:
case <-ctx.Done():
return
}
}
}()
// 결과 close 동기화
go func() {
wg.Wait()
close(out)
}()
var results []Result
for r := range out {
results = append(results, r)
}
return results
}이 한 함수에 — fan-out, semaphore (concurrency가 그 역할), context 취소, fan-in까지 다 들어 있습니다. 실전에서 자주 보이는 형태입니다.
마무리 #
이번 글에서 정리한 내용:
- pipeline — 단계마다 채널, 자기 채널을 close
- fan-out — 한 채널을 여러 워커가 공유
- fan-in (merge) — 여러 채널을 하나로, WaitGroup으로 close 동기화
- worker pool = fan-out + fan-in
- semaphore — 버퍼 채널로 동시 실행 수 제한
- or-channel / tee / bridge — 도구상자
- 항상 ctx/done같이 듣기 — 누수 방지
- 닫힌 채널에 send는 panic — 한 곳에서만 close
다음 글(#2 메모리 모델과 sync 패키지)에서는 — 채널 대신 공유 메모리를 쓸 때 필요한 도구들. Mutex, atomic, Once가 어떻게 동작하고 언제 어울리는지 정리합니다.