I have code that concurrently reads data from a stream, processes elements, and writes the data to another stream. Each of these individual operations may fail, in which case I want processing to halt and all resources to be released. I also want the caller to have access to the specific error which caused the process to halt.
To do this now, I use context.Context
and cancel()
using a pattern like the one below. I'm fairly certain that I am using context.Context
in an unintended way: it seems strange to pass both the Context
and its cancellation function to every function. I need the Context
so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read
/DoWork
/Write
functions, the error is not propagated to the caller.
Note: The IllegalNumber
and IllegalString
constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString
to "1"
results in WriteObjects
failing.
package main import ( "context" "errors" "fmt" "os" "strconv" "sync" ) const ( IllegalNumber = 7 IllegalString = "0" ) func ReadObjects( ctx context.Context, cancel context.CancelCauseFunc, ) <-chan int { // Mimic input data. readChannel := make(chan int) go func() { for i := range 5 { readChannel <- i } close(readChannel) }() result := make(chan int) go func() { defer close(result) for { select { case <-ctx.Done(): return default: i, ok := <-readChannel if !ok { return } if i == IllegalNumber { cancel(errors.New("read illegal number")) return } result <- i } } }() return result } func DoWork( ctx context.Context, cancel context.CancelCauseFunc, objs <-chan int, ) <-chan int { result := make(chan int) go func() { defer close(result) for { select { case <-ctx.Done(): return default: i, ok := <-objs if !ok { return } newI := i + 1 if newI == IllegalNumber { cancel(errors.New("created illegal number")) return } result <- newI } } }() return result } func WriteObjects( ctx context.Context, cancel context.CancelCauseFunc, objs <-chan int, ) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return default: i, ok := <-objs if !ok { return } strI := strconv.Itoa(i) if strI == IllegalString { cancel(errors.New("tried to write illegal string")) return } fmt.Println(strI) } } }() return wg } func main() { ctx := context.Background() ctx, cancel := context.WithCancelCause(ctx) inObjects := ReadObjects(ctx, cancel) outObjects := DoWork(ctx, cancel, inObjects) writeWg := WriteObjects(ctx, cancel, outObjects) writeWg.Wait() if ctx.Err() != nil { fmt.Println(context.Cause(ctx)) os.Exit(1) } }
In reality these operations are multi-threaded, channels are properly buffered, but that should be irrelevant to the anti-pattern I'm sure I'm using.