Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: concurrency with 1.23 iterator #137

Open
amikai opened this issue Aug 1, 2024 · 3 comments
Open

Proposal: concurrency with 1.23 iterator #137

amikai opened this issue Aug 1, 2024 · 3 comments

Comments

@amikai
Copy link

amikai commented Aug 1, 2024

Proposal

I propose the four functions

package iter
// unorder
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

package stream
// order
func SeqMap[In, Out any](iter.Seq[In], func(In) Out) iter.Seq[Out]
func Seq2Map[In1, In2, Out1, Out2 any](iter.Seq2[In1, In2], func(In1, In2) (Out1, Out2)) iter.Seq2[Out1, Out2]

SeqMap under conc iter:

  • On user side, users to be able to get results processed with concurrency through a sequential for loop. When users break the loop, the running goroutines should be canceled to avoid wasting resources.
  • The values are received in an orderly fashion and processed by the function. However, we don't know which one finishes first because they are processed concurrently.
  • The SeqMap should limit the number of goroutines when running, like iter.Map
  • User can decide map to different type or not
for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
	fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 9
// 16
// 36

SeqMap2 under conc iter: similar to iter.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range iter.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is not order:
// 4
// 1
// 16

SeqMap under conc stream: similar to iter.SeqMap, but in order way.

for v := range iter.SeqMap(slices.Values([]int{1, 2, 3, 4, 5, 6, 7}), func(x int) int { return x * 2 }) {
        if v == 25 {
            break
        }
	fmt.Println(x)
}
// Output is ordered:
// 1
// 4
// 9
// 16

SeqMap2 under conc conc: similar to stream.SeqMap but returns two values, allowing it to be used for error handling.

// we assume that seq2WithError produce following
// 1, nil
// 2, nil
// 3, errors.New("123")
// 4, nil
// 5, nil
var seq2WithError iter.Seq2[int, error] = ...

for v := range stream.SeqMap2(seq2WithError, func(x int, err error) int { return x * 2, err }) {
        if err != nil {
            break
        }
        fmt.Println(x)
}
// Output is ordered:
// 1
// 4

Some thought

I believe the best aspects of a concurrent iterator are its ability to calculate lazily and its resource-saving capability. Functions like iter.Map and iter.ForEach will exhaust all elements, but with an iterator, when the output iterator stops, the input iterator should also cease retrieving elements and stop all goroutines.

@amikai
Copy link
Author

amikai commented Aug 1, 2024

The part I find more troublesome when implementing is that letting the user decide when to stop the loop means developer have to have to implement the complex mechanism to close the channel at receiver side.

I think we also need to consider how to use it with the context. However, I have no idea how to do that at the moment.

@camdencheek
Copy link
Member

Hello @amikai! I've been toying with ideas in this direction on my end as well, so I'm glad to see others think this is an exciting possibility. Thank you for writing up this proposal.

When users break the loop, the running goroutines should be canceled to avoid wasting resources.

This one is a quite tricky since it requires conc to know how to cancel running goroutines. The standard way to do this is to have a cancellable context, but that means that conc must either be given a cancel() func, or it must own the context that child goroutines respect.

One way around it is just to leave cancellation to the caller. I personally like this because although it is a little harder to use, it's more predictable IMO and leaves control fully in the hands of the user.

ids := slices.Values([]int{1,2,3,4,5,6})
func fetchSquare(ctx context.Context, id int) {
	...
}

ctx, cancel := context.WithCancel(ctx)

for v := range iter.SeqMap(ints, func(x int) int { return fetchSquare(ctx, x) }) {
	if v == 25 {
		cancel()
		break
	}
	fmt.Println(v)
}

letting the user decide when to stop the loop means developer have to have to implement the complex mechanism to close the channel at receiver side.

I think we can design this so that the consumer doesn't have to worry about it. Because of the design of the iter package, control will be yielded back to conc when the user breaks the loop (the yield func returns false), which gives us the opportunity to clean up goroutines and propagate panics.

@amikai
Copy link
Author

amikai commented Aug 1, 2024

This is a good idea, but I want to ask if it might cause a goroutine leak if the user does not call cancel and the for loop has already been broken.

Or in another situation, the context is already canceled but there is no break in for loop.

Perhaps we can pass the context by changing the method to iter.SeqMapCtx(ctx, xxx), or by creating a struct with WithContext like Pool.WithContext(ctx). This way, users have the choice to cancel themselves, but after a break, iter.SeqMap will definitely cancel (we can derive a cancellable context during implementation).

This will make it easier for users because they won’t need to worry about forgetting to cancel it. But I think this will cause problems for developers, including increased implementation difficulty and how to handle panic situations.

// function declaration
func SeqMapCtx[In, Out any](context.Context, iter.Seq[In], func(context.Context, In) Out ) iter.Seq[Out] {...}

ids := slices.Values([]int{1,2,3,4,5,6})
func fetchSquare(ctx context.Context, id int) int {
	...
}

ctx := context.Background()

// SeqMapCtx will derived the context with cancel
for v := range iter.SeqMapCtx(ctx, fetchSquare}) {
	if v == 25 {
		break // when yield return false, cancel the context and clean up goroutines
	}
	fmt.Println(v)
}

Summary of my opinion

This is my opinion. I hope it helps.

  • Users have the option to pass a context or not, similar to choosing between Pool and ContextPool. If user choose passing context way, then conc help it to cancel task and clean gorotines after break.
  • If users do not pass a context, it is their responsibility to handle context. (cancel by themself)

NOTE: I haven't PoC yet. I'm not sure if it can be implemented.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants