Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

group.Wait() doesn't wait for all started jobs to finish unlike pool.StopAndWait() #68

Open
mikegleasonjr opened this issue Aug 21, 2024 · 8 comments

Comments

@mikegleasonjr
Copy link

mikegleasonjr commented Aug 21, 2024

Given this test:

package something

import (
	"context"
	"errors"
	"testing"
	"time"

	"github.com/alitto/pond"
	"github.com/stretchr/testify/assert"
)

func TestStopAndWait(t *testing.T) {
	workers := 2

	jobsStarted := []bool{false, false, false, false}
	jobsEnded := []bool{false, false, false, false}
	jobsDuration := []time.Duration{51 * time.Millisecond, 100 * time.Millisecond, 51 * time.Millisecond, 50 * time.Millisecond}
	jobsReturnValue := []error{nil, errors.New("error"), nil, nil}

	pool := pond.New(workers, len(jobsEnded))
	group, _ := pool.GroupContext(context.Background())

	for i := 0; i < len(jobsStarted); i++ {
		group.Submit(func() error {
			t.Log("job started", i+1)
			jobsStarted[i] = true
			defer func() {
				t.Log("job ended", i+1)
				jobsEnded[i] = true
			}()
			time.Sleep(jobsDuration[i])
			return jobsReturnValue[i]
		})
	}

	// uncomment this to make the test pass:
	// pool.StopAndWait()
	// or
	// time.Sleep(100 * time.Millisecond)

	err := group.Wait()
	assert.Equal(t, []bool{true, true, true, false}, jobsStarted, "unexpected jobsStarted")
	assert.Equal(t, []bool{true, true, true, false}, jobsEnded, "unexpected jobsEnded")
	assert.EqualError(t, err, "error")
}

It has the following output:

=== RUN   TestStopAndWait
    bug_report_test.go:26: job started 2
    bug_report_test.go:26: job started 1
    bug_report_test.go:29: job ended 1
    bug_report_test.go:26: job started 3
    bug_report_test.go:29: job ended 2        <---- job 3 still running but test ended
    bug_report_test.go:44: 
        	Error Trace:	bug_report_test.go:44
        	Error:      	Not equal: 
        	            	expected: []bool{true, true, true, false}
        	            	actual  : []bool{true, true, false, false}
        	            	
        	            	Diff:
        	            	--- Expected
        	            	+++ Actual
        	            	@@ -3,3 +3,3 @@
        	            	  (bool) true,
        	            	- (bool) true,
        	            	+ (bool) false,
        	            	  (bool) false
        	Test:       	TestStopAndWait
        	Messages:   	unexpected jobsEnded
--- FAIL: TestStopAndWait (0.10s)

If we uncomment pool.StopAndWait() or time.Sleep(100 * time.Millisecond), the test passes:

=== RUN   TestStopAndWait
    bug_report_test.go:26: job started 2
    bug_report_test.go:26: job started 1
    bug_report_test.go:29: job ended 1
    bug_report_test.go:26: job started 3
    bug_report_test.go:29: job ended 2
    bug_report_test.go:29: job ended 3       <---- we waited for job 3
--- PASS: TestStopAndWait (0.15s)
PASS

What should happen:

0 ms ------------------ 50 ms ------------------ 100 ms ------------------ 150 ms
job 1 starts
job 2 starts
                   job 1 finishes
                    job 3 starts
                                           job 2 errors out
                                     (global context gets cancelled)
                                                                      job 3 finishes
job 4 never starts
-------------------------------------------------------------------------------

What happens:

group.Wait() is not waiting for job 3.

Side effects:

If job 3 creates and returns resources that must be freed up (like an io.Closer), it causes a memory leak because the pool manager never knew it ended and returned something. It is not up to the worker to check if the pool stopped before returning its value to the pool and do cleanup.

What should happen:

group.Wait() should behave like pool.StopAndWait() (minus the stop)

Temporary fix:

	[...]
	pool := pond.New(workers, len(jobsEnded))
	group, _ := pool.GroupContext(context.Background())

	func() {
		defer pool.StopAndWait()

		for i := 0; i < len(jobsEnded); i++ {
			group.Submit(func() error {
				t.Log("job started", i+1)
				jobsStarted[i] = true
				defer func() {
					t.Log("job ended", i+1)
					jobsEnded[i] = true
				}()
				time.Sleep(jobsDuration[i])
				return jobsReturnValue[i]
			})
		}
	}()

	err := group.Wait()
	[...]
@alitto
Copy link
Owner

alitto commented Aug 26, 2024

Hi @mikegleasonjr!

Yes, the behaviour you are describing is correct and it's by design. Unlike the StopAndWait() method on the worker pool, the context group's Wait() method waits until either one of the tasks exited with error or all tasks completed successfully (nil error).
These two methods are meant to be used in different circumstances. Given that worker pool instances are meant to be reused across all goroutines (singleton), the StopAndWait() method is usually invoked when tearing down the application (e.g. after receiving the exit or kill signal). In this scenario, the goroutine that executes the shutdown procedure doesn't really expect to handle an error in one of the tasks that have been sent to the pool, all it does is wait for any pending tasks to complete (succesfully or not). This is why this method doesn't return an error object.
The context group's Wait() method, on the other hand, is meant to be called after submitting a batch of tasks that are inter-related (e.g. uploading a collection of images within the handler of a single HTTP request) and that usually also means they share a context.Context. The semantics of this method are inspired by the errgroup package. When dealing with a bunch of tasks that are tied to the same context, you usually want to fail fast and return as soon as the first error is thrown. Moreover, if the group's context.Context object gets cancelled, then any pending task is not executed.
That said, there's another kind of "group of tasks" that behaves in the way you described and that's the one created with pool.Group(). If you create the group using this method, then the Wait() method will wait for all tasks to complete, regardless of any error. However, this kind of task group doesn't share a context.Context object and task functions cannot return error (each task is expected to handle errors internally).

@alitto
Copy link
Owner

alitto commented Oct 20, 2024

Hey @mikegleasonjr

I wanted to let you know v2 of this library now comes with a new kind of pool that could be useful in this particular use case. You can see more details in this section of the readme, but let me show you how the workaround you described would look like if using it:

// This struct represents the result of a job
type JobResult struct {
    value string
    err error
}

// Create a pool
pool := pond.NewResultPool[JobResult](10)

// Create a task group
group := pool.NewGroup()

// Submit all jobs as part of the group
for i := 0; i < jobCount; i++ {
    group.Submit(func() JobResult {
        // Execute the job
        value, err := runJob()
        
        // Wrap the result in a struct that holds the value and any error that occurred
        return JobResult{
            value: value,
            err: err,
        }
    })
}

// Wait for all jobs to be executed (concurrently) and retrieve the results in the order they were submitted
jobResults, err := group.Wait()

Notice I am using the Submit(func() R) method, which doesn't return an error explicitly. This means that all jobs will complete their execution and the Wait call will effectively wait for all of them to complete (successfully or not) instead of returning upon the first error.

Please let me know your thoughts on this 🙂. Thanks!

@pkovacs
Copy link

pkovacs commented Oct 21, 2024

In my use case my task is a tree of tasks. I would like each task and its sub-tasks to share the same context, so any error in the tree propagates through the tree via a cancelled context. I want to submit each child task onto the pond and have the top of the tree, the main task, wait for all the children to finish and respect the context. I assume I have to use groups with contexts for that approach?

@alitto
Copy link
Owner

alitto commented Oct 21, 2024

Hey @pkovacs, I'd like to know more about that use case, could you share a pseudo-code snippet of what your tree of tasks looks like?

In v2, if you need to pass a conext to a group of related tasks you need to pass it directly to the tasks's closure. Here's a full working example, but it would look like this:

// Create a pool with a result type of string
pool := pond.NewResultPool[string](10)

// Create a task group
group := pool.NewGroup()

// Creating a custom context for this particular group
ctx, cancel := context.WithCancel(context.Background())

// Create tasks for fetching URLs
for _, url := range urls {
    url := url
    group.SubmitErr(func() (string, error) {
        return fetchURL(ctx, url) // <- context is passed directly in the task's function body
    })
}

// Wait for all HTTP requests to complete.
responses, err := group.Wait()

If ctx gets cancelled, then all tasks in the group will be canceled too (it's up to the task itself to handle the context's Done event).
Please notice that all pools also have a context associated with them, which causes it and all its tasks to be stopped when cancelled. However, pools are meant to be shared across multiple requests/threads so I wouldn't recommend passing your custom context to the pool via the pond.WithContext option. The pool context is meant to be cancelled when the app is shutting down.

@pkovacs
Copy link

pkovacs commented Oct 21, 2024

Sure, essentially this simplified example. I've omitted any context cancellations. so just imagine that any of the tasks might produce an error serious enough to warrant cancellation of anything in that context which has not started yet. I have several layers of tasks in my real application. The code below seems to work just fine.

package main

import (
        "context"
        "fmt"

        "github.com/alitto/pond/v2"
)

var pool pond.Pool

type Task struct {
        n int
}

type SubTask struct {
        n int
}

func main() {
        pool = pond.NewPool(100)
        ctx := context.Background()
        group := pool.NewGroup()
        for i := 0; i < 5; i++ {
                task := &Task{
                        n: i,
                }
                group.Submit(doTask(ctx, task))
        }
        group.Wait()
        pool.StopAndWait()
}

func doTask(ctx context.Context, t *Task) func() {
        return func() {
                select {
                case <-ctx.Done():
                        // something cancelled us
                default:
                        // run normally
                        fmt.Printf("Running task #%d\n", t.n)
                        group := pool.NewGroup()
                        for i := 0; i < 5; i++ {
                                subtask := &SubTask{
                                        n: i,
                                }
                                group.Submit(doSubTask(ctx, subtask))
                        }
                        group.Wait()
                }
        }
}

func doSubTask(ctx context.Context, t *SubTask) func() {
        return func() {
                select {
                case <-ctx.Done():
                        // something cancelled us
                default:
                        // run normally
                        fmt.Printf("...running subtask #%d\n", t.n)
                }
        }
}

@alitto
Copy link
Owner

alitto commented Oct 21, 2024

What do you think about this example?

package main

import (
	"context"
	"fmt"

	"github.com/alitto/pond/v2"
)

var pool pond.Pool

type Task struct {
	n int
}

type SubTask struct {
	n int
}

func main() {
	pool = pond.NewPool(100)
	ctx := context.Background()
	group := pool.NewGroup()
	for i := 0; i < 5; i++ {
		task := &Task{
			n: i,
		}
		group.SubmitErr(doTask(ctx, task))
	}
	group.Wait()
	pool.StopAndWait()
}

func doTask(ctx context.Context, t *Task) func() error {
	return func() error {
		select {
		case <-ctx.Done():
			// something cancelled us
			return ctx.Err()
		default:
			// run normally
			fmt.Printf("Running task #%d\n", t.n)
			group := pool.NewGroup()
			for i := 0; i < 5; i++ {
				subtask := &SubTask{
					n: i,
				}
				group.SubmitErr(doSubTask(ctx, t.n, subtask))
			}

			// return the first error encountered
			err := group.Wait()

			if err != nil {
				fmt.Printf("Task #%d failed: %v\n", t.n, err)
			} else {
				fmt.Printf("Task #%d completed successfully\n", t.n)
			}

			return err
		}
	}
}

func doSubTask(ctx context.Context, task int, t *SubTask) func() error {
	return func() error {
		select {
		case <-ctx.Done():
			// something cancelled us
			return ctx.Err()
		default:
			// run normally
			fmt.Printf(" - Running subtask #%d-%d\n", task, t.n)

			if task%2 == 1 && t.n == 2 {
				return fmt.Errorf("subtask #%d encountered an error", t.n)
			}

			return nil
		}
	}
}

I modified the snippet you posted to use SubmitErr(func() error) instead of Submit. This way, if any of the subtasks fails with an error, then the rest of the subtasks in the same group will be ignored. Notice that i'm also returning the context's error (ctx.Err()) to make it clear that the subtask failed.

@pkovacs
Copy link

pkovacs commented Oct 21, 2024

Yeah, that's helpful -- propagating the error up via the return. I just have to decide how serious each particular error is and make decisions on whether to cancel or continue. Some errors are more appropriate to log but continue.

Anyway it's a nice package you've written and it looks like my timing for v2 is fortuitous.

@alitto
Copy link
Owner

alitto commented Oct 21, 2024

I'm glad You find it useful! 🙂
Cheers

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants