3
\$\begingroup\$

I have code that concurrently reads data from a stream, processes elements, and writes the data to another stream. Each of these individual operations may fail, in which case I want processing to halt and all resources to be released. I also want the caller to have access to the specific error which caused the process to halt.

To do this now, I use context.Context and cancel() using a pattern like the one below. I'm fairly certain that I am using context.Context in an unintended way: it seems strange to pass both the Context and its cancellation function to every function. I need the Context so that each function knows when to halt work, and I need the cancellation function so any subprocess may choose to halt work. If I create the child context and cancellation inside the Read/DoWork/Write functions, the error is not propagated to the caller.

Note: The IllegalNumber and IllegalString constants are meant to allow exploring various failure states of the logic to understand the intended behavior of the code. For instance, changing IllegalString to "1" results in WriteObjects failing.

package main import ( "context" "errors" "fmt" "os" "strconv" "sync" ) const ( IllegalNumber = 7 IllegalString = "0" ) func ReadObjects( ctx context.Context, cancel context.CancelCauseFunc, ) <-chan int { // Mimic input data. readChannel := make(chan int) go func() { for i := range 5 { readChannel <- i } close(readChannel) }() result := make(chan int) go func() { defer close(result) for { select { case <-ctx.Done(): return default: i, ok := <-readChannel if !ok { return } if i == IllegalNumber { cancel(errors.New("read illegal number")) return } result <- i } } }() return result } func DoWork( ctx context.Context, cancel context.CancelCauseFunc, objs <-chan int, ) <-chan int { result := make(chan int) go func() { defer close(result) for { select { case <-ctx.Done(): return default: i, ok := <-objs if !ok { return } newI := i + 1 if newI == IllegalNumber { cancel(errors.New("created illegal number")) return } result <- newI } } }() return result } func WriteObjects( ctx context.Context, cancel context.CancelCauseFunc, objs <-chan int, ) *sync.WaitGroup { wg := &sync.WaitGroup{} wg.Add(1) go func() { defer wg.Done() for { select { case <-ctx.Done(): return default: i, ok := <-objs if !ok { return } strI := strconv.Itoa(i) if strI == IllegalString { cancel(errors.New("tried to write illegal string")) return } fmt.Println(strI) } } }() return wg } func main() { ctx := context.Background() ctx, cancel := context.WithCancelCause(ctx) inObjects := ReadObjects(ctx, cancel) outObjects := DoWork(ctx, cancel, inObjects) writeWg := WriteObjects(ctx, cancel, outObjects) writeWg.Wait() if ctx.Err() != nil { fmt.Println(context.Cause(ctx)) os.Exit(1) } } 

In reality these operations are multi-threaded, channels are properly buffered, but that should be irrelevant to the anti-pattern I'm sure I'm using.

\$\endgroup\$

    2 Answers 2

    4
    \$\begingroup\$

    You are correct that you should not use context this way. Context is for request context and cancellation.

    I encountered different approaches for handling errors in concurrent pipelines. One way is to carry the error through the stages and in the end log it, so someone can check it later. What you want is different: you want to stop the whole pipeline when an error happens. Since this is a concurrent pipeline, you may have multiple errors happen at the same time as well.

    One way to deal with this is to use an error channel. The sketch of the idea is something like this:

    ctx, cancel := context.WithCancel(context.Background()) defer cancel() errCh:=make(chan error) inputCh:=make(chan Payload) // Setup your pipeline stage1Output:=NewPipelineStage1(ctx,inputCh,errCh) stage2Output:=NewPipelineStage2(ctx,stage1Output,errCh) ... // Listen to error channel go func() { for err:=range errCh { // Record error // Cancel context. This will stop pipeline. You can call cancel // multiple times. cancel() } }() // Feed data for _,data:=range dataSource { select { case inputCh<-data: case <-ctx.Done(): // Stop feeding in case of error return } } 

    Pipeline stages look like this:

    func NewPipelineStage1(ctx context.Context, input <-chan Payload, errCh chan<- error) chan Payload{ outputCh:=make(chan Payload) defer close(outputCh) go func() { for { select { case data, ok:=-<-input if ok { // work with data if err!=nil { errCh<-err } } else { return } case <-ctx.Done(): return } } }() return outputCh } 
    \$\endgroup\$
    4
    • \$\begingroup\$Sweet, exactly the kind of thing I was looking for. This pattern also will let me run pipelines in a debug mode where errors are logged instead of always resulting in cancellation.\$\endgroup\$CommentedAug 23, 2024 at 17:07
    • \$\begingroup\$In my experience, it is almost never worth tracking the errors after the first one because the cancellation itself will trigger "canceled" errors all over the pipeline, cluttering up logs with unimportant information. It's simpler to just pass a CancelCauseFunc to the subcomponents.\$\endgroup\$CommentedAug 24, 2024 at 3:37
    • \$\begingroup\$@Rufflewind that can be easily dealt with by only sending pipeline logic related errors through the channel\$\endgroup\$CommentedAug 24, 2024 at 4:06
    • \$\begingroup\$That is not necessarily obvious or easy to do with a large enough pipeline given that "canceled" errors can crop up from anywhere. It is much simpler to just ignore all but the first error.\$\endgroup\$CommentedAug 24, 2024 at 4:47
    2
    \$\begingroup\$

    Use of func(error) to report errors

    I don't see anything intrinsically wrong with passing a func(error) to each callee per se:

    1. Context exists to track whether the current work is needed. Whether the context is cancellable or not is not the callee's concern.

    2. A func(error) is a reasonable API for reporting errors asynchronously. Whether that signal triggers cancellation or some other activity (logging, fatal abort, etc) is not the callee's concern.

    They are orthogonal responsibilities and not in conflict with each other.

    Single selectcase with default

    It rarely makes sense to call select { case: ... default: ... }:

    select { case <-ctx.Done(): return default: i, ok := <-readChannel ... } 

    It would be no different than calling:

    if ctx.Err() { return } i, ok := <-readChannel 

    The ctx.Done() / ctx.Err() check will only happen for a brief instant, and the rest of the time it will be stuck, blocked on the readChannel. This renders context-based cancellation somewhat redundant since one could just as well have canceled by closing the readChannel!

    It would be more idiomatic to write:

    select { case <-ctx.Done(): return case i, ok := <-readChannel: ... } 

    This will block on bothctx and readChannel until something happens to either of them.

    Cleaning up background goroutines

    If cleanup hygiene is important, then it is important to verify that all background goroutines finish before returning control to the caller, or in this case, before main exits. This ensures that all deferred cleanup work (e.g. deleting a temporary directory) are completed before exit.

    Cancellation tends to cause problems here. For example, it's possible that ReadObjects fails and triggers an early exit in WriteObjects. That then calls wg.Done(), which can cause main to exit before DoWork completes its cleanup.

    In this case, one can avoid the issue by adding every goroutine to the WaitGroup, not just the last one.

    Restructuring

    The code has lots of repetitive concurrency / error-handling patterns that exist solely for the plumbing, which tend to clutter the domain logic. Regardless of what approach you take, it would be simpler to wrap them into plumbing helpers so that the domain logic is more readable. Example:

    func Read[T any](ctx context.Context, ch <-chan T) (T, error) { var empty T select { case <-ctx.Done(): return empty, ctx.Err() case val, ok := <-ch: if !ok { return empty, io.EOF } return val, nil } } type Pipeline struct { onError func(error) wg sync.WaitGroup } func NewPipeline(onError func(error)) *Pipeline { return &Pipeline{onError: onError} } func (p *Pipeline) Wait() { p.wg.Wait() } func Go[T any](p *Pipeline, name string, f func(chan<- T) error) <-chan T { ch := make(chan T) p.wg.Add(1) go func() { defer p.wg.Done() close(ch) if err := f(ch); err != nil { p.onError(fmt.Errorf("%s: %w", name, err)) } }() return ch } func ReadObjects(ctx context.Context, result chan<- int) error { ... } func DoWork(ctx context.Context, result chan<- int, objs <-chan int) error { for { i, err := Read(ctx, objs) if err != nil { return err } newI := i + 1 if newI == IllegalNumber { return errors.New("created illegal number") } result <- newI } } func WriteObjects(ctx context.Context, outObjects <-chan int) error { ... } func RunMyPipeline(ctx context.Context) error { ctx, cancel := context.WithCancelCause(ctx) defer cancel(nil) p := NewPipeline(cancel) inObjects := Go(p, "ReadObjects", func(r chan<- int) error { return ReadObjects(ctx, r) }) outObjects := Go(p, "DoWork", func(r chan<- int) error { return DoWork(ctx, r, inObjects) }) Go(p, "WriteObjects", func(chan<- struct{}) error { return WriteObjects(ctx, outObjects) }) p.Wait() return context.Cause(ctx) } 
    \$\endgroup\$

      Start asking to get answers

      Find the answer to your question by asking.

      Ask question

      Explore related questions

      See similar questions with these tags.