After watching talks (namely Robs Pikes talk) and reading more about golang's approach to concurrency and the intended architecture of highly concurrent systems. I wanted to see if I could abstract it a little to manage the channels and go routines in a more central way.
I wrote this pipeline abstraction and wanted to know if it has any memory leak / deadlock or inherent performance tradeoffs compared to starting and managing the channels and goroutines yourself.
package main import ( "context" "fmt" "math/rand" "strconv" "time" ) var ( rounds = 500 ) func main() { ctx, cancel := context.WithCancel(context.Background()) defer cancel() steps := []Step{ {name: "download asset", factor: 20, depth: 1000, Transformer: Transformer{do: func(ctx context.Context, data interface{}, err error) (interface{}, error) { strData, ok := data.(string) if !ok { return nil, fmt.Errorf("expected string but got %T", data) } time.Sleep(time.Millisecond * time.Duration(rand.Int()%1000)) return strData, nil }}}, {name: "build assets", factor: 20, depth: 1000, Transformer: Transformer{do: func(ctx context.Context, data interface{}, err error) (interface{}, error) { strData, ok := data.(string) if !ok { return nil, fmt.Errorf("expected string but got %T", data) } time.Sleep(time.Millisecond * time.Duration(rand.Int()%1000)) return strData, nil }}}, {name: "save to dropbox", factor: 20, depth: 1000, Transformer: Transformer{do: func(ctx context.Context, data interface{}, err error) (interface{}, error) { strData, ok := data.(string) if !ok { return nil, fmt.Errorf("expected string but got %T", data) } time.Sleep(time.Millisecond * time.Duration(rand.Int()%1000)) return strData, nil }}}, } pipeline := NewPipeline(ctx, steps) timer := utils.Start() pipe := pipeline.Start() defer pipe.Terminate() for i := 0; i < rounds; i++ { go func(j int) { pipe.Input <- strconv.Itoa(j) }(i) } for i := 0; i < rounds; i++ { <-pipe.Output } timer.StopNPrint("pipeline") } type Step struct { name string factor int depth int Transformer } type Transformer struct { do func(context.Context, interface{}, error) (interface{}, error) } func NewPipeline(ctx context.Context, steps []Step) Pipeline { var p Pipeline ctx, cancel := context.WithCancel(ctx) p.ctx = ctx p.cancel = cancel p.Steps = steps p.Errors = make(chan PipelineError) for i := 0; i < len(steps); i++ { p.Intermediary = append(p.Intermediary, make(chan interface{}, steps[i].depth)) } p.Intermediary = append(p.Intermediary, make(chan interface{}, 1)) p.Input = p.Intermediary[0] p.Output = p.Intermediary[len(p.Intermediary)-1] return p } func (p *Pipeline) Start() Pipeline { go p.terminationListener() for i := range p.Steps { for j := 0; j < p.Steps[i].factor; j++ { go func(s Step, i int) { for { select { // try to read val from chan case <-p.ctx.Done(): return // the channel is closed, return case read, ok := <-p.Intermediary[i]: if !ok { return // channel is closed, return } response, err := s.do(p.ctx, read, nil) if err != nil { p.Errors <- PipelineError{Err: err, From: s.name, Input: read} } select { // try to write to channel until it accepts, or the channel is closed case p.Intermediary[i+1] <- response: // successfully sent the response case <-p.ctx.Done(): // the context is cancelled, stop processing return } } } }(p.Steps[i], i) } } return *p } func (p *Pipeline) Terminate() { p.cancel() } func (p *Pipeline) terminationListener() { <-p.ctx.Done() for i := range p.Intermediary { close(p.Intermediary[i]) // close them from input -> output to avoid deadlocks } } type PipelineError struct { Err error From string Input interface{} } type Pipeline struct { Input chan interface{} Output chan interface{} Intermediary []chan interface{} Errors chan PipelineError Steps []Step ctx context.Context cancel context.CancelFunc }