Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling Concurrency #12

Closed
git-blame opened this issue Jul 6, 2021 · 5 comments
Closed

Handling Concurrency #12

git-blame opened this issue Jul 6, 2021 · 5 comments

Comments

@git-blame
Copy link

Dumb question: can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?

I'm adding tasks concurrently. I'm also debugging by printing out the total tasks as well as the waiting tasks (which I call load) and the numbers are all over the place.

2021/07/06 15:34:46 Events.go:632: Event processing total: 119 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 0
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
2021/07/06 15:34:46 Events.go:632: Event processing total: 124 load: 18446744073709551615
@git-blame
Copy link
Author

I'm not sure about the concurrency.

But the waiting tasks count seem to have an edge condition if it is already 0. The number "wraps" around with an integer over/underflow that matches my output.

from this code:

	// Decrement waiting task count
	atomic.AddUint64(&p.waitingTaskCount, ^uint64(0))
package main

import (
	"fmt"
	"sync/atomic"
)

func main() {
	var a = ^uint64(0)
	var b uint64 = 1
	fmt.Println(a)
	fmt.Println(b)
	atomic.AddUint64(&b, a)
	fmt.Println(b)
	atomic.AddUint64(&b, a)
	fmt.Println(b)
	
}

yields:

18446744073709551615
1
0
18446744073709551615

@alitto
Copy link
Owner

alitto commented Jul 6, 2021

Hey @git-blame,

can tasks be submitted concurrently to a pool or do I need to use a mutex to control access to it?

Yes, pools can be shared across goroutines without using a mutex

Regarding the WaitingTasks() method, it seems to be possible to see 18446744073709551615 for a brief period of time, but it should always go back to 0 eventually (unless I'm missing some edge case).
Could you provide a minimal example of this behavior? I'd like to reproduce it to see if this can be improved.

Thanks for reaching out 🙂

@git-blame
Copy link
Author

I'm just testing with processing a large number of concurrent POST requests. I'm trying to find a point in my system when the pool queue might be "too large" (e.g., would take X seconds to process) and switch to writing the requests to disk for later processing. That's why I'm checking WaitingTasks() value before submitting a task to see if it exceeds my threshold.

I guess my in-memory processing is sufficiently fast enough that there are rarely any tasks in the queue. Yet there are enough concurrent calls to WaitingTasks() such that some of them will see this temporary value.

@alitto
Copy link
Owner

alitto commented Jul 10, 2021

I see. Yes, for that use case you need a reliable counter. I just submitted a fix for this particular scenario (See https://github.com/alitto/pond/pull/14/files) and ensure the waitingTasks counter never wraps around. What I'm doing there is essentially changing the order in which I increment and decrement the counter to guarantee increment always executes before decrement. The version that includes the fix is v1.5.1.
Please try it out and let me know your thoughts. Thanks again!

@git-blame
Copy link
Author

Great. No more underflow counts. Thanks.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants