-
Notifications
You must be signed in to change notification settings - Fork 220
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WorkerTuner & Resource based autotuning #1546
Conversation
0ce54d7
to
927ff6f
Compare
9643c0a
to
85ba756
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Admittedly didn't do a deep dive comparing against existing/changed semaphore/permit logic because I found it a bit hard to follow and I haven't been in this code in a while. Regardless, it does take a solid amount of bravery to work in these depths, would welcome running as many possible tests as we can muster.
// on system resources. Specify the target CPU and memory usage as a value between 0 and 1. | ||
// | ||
// WARNING: Resource based tuning is currently experimental. | ||
func CreateResourceBasedTuner(targetCpu, targetMem float64) (worker.WorkerTuner, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would recommend accepting an options struct w/ just these two fields set (same elsewhere). Go has terrible future proofing for adding params.
contrib/resourcetuner/go.mod
Outdated
@@ -0,0 +1,48 @@ | |||
module resourcetuner |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
module resourcetuner | |
module go.temporal.io/sdk/contrib/resourcetuner |
if reserveCtx.NumIssuedSlots() < r.options.MinSlots { | ||
return &worker.SlotPermit{}, nil | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hrmm, I wonder if NumIssuedSlots
can change during the life of this call. If it is mutable, can you confirm that the source of that value is not referenced in any other tuner reserve call? Just want to make sure multiple suppliers aren't going to see that it's 1 below min at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, good catch, it'd make sense to put a lock around this bit to avoid bursting from 0 too fast.
maxConcurrentWFT := options.MaxConcurrentWorkflowTaskExecutionSize | ||
maxConcurrentAct := options.MaxConcurrentActivityExecutionSize | ||
maxConcurrentLA := options.MaxConcurrentLocalActivityExecutionSize |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can probably just mutate the options still at this point since I don't expect these three fields are ever even referenced anymore after this function returns, but no big deal of course
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That actually causes problems since sometimes the tests will end up passing these around and this function gets run more than once and then the checks that you haven't set both maxConcurrentXXX and Tuner fail.
internal/internal_worker_base.go
Outdated
if !errors.Is(err, context.Canceled) { | ||
bw.logger.Error(fmt.Sprintf("Error while trying to reserve slot: %v", err)) | ||
} else { | ||
close(reserveChan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's rare to ever need to close channels in Go except as a broadcast mechanism. It can lead to confusing behavior. In this case the next send attempt in this loop will panic and every receive from here on out will immediately succeed with zero value
Did you maybe mean to create the channel inside the loop? Even if, would recommend sending nil
to it instead of erroring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll just send an explicit nil. In any case this is one of the things I needed to add a test for still so will do that for sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, so, there was a good reason for this. The explicit send of nil will block forever since, very likely, the other side waiting on it has already exited b/c of the stop channel. In fact, nothing needs to be done at all in this branch. In the branch with the logging, sending nil works.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do sending in a select that is bounded by context close as well (like you do below w/ non-nil send). It is dangerous to send something to a unbuffered channel unless you're 100% sure the receiver will get it. You can get goroutine leaks otherwise because the send will hang.
if permit == nil { // There was an error reserving a slot | ||
// Avoid spamming reserve hard in the event it's constantly failing | ||
time.Sleep(time.Second) | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC the channel can never be used again at this point correct, it is closed. Which means this select call will always resolve immediately with a nil value. So why continue
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Sushisource I agree this looks like a bug
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This channel doesn't get closed any more
c438ef2
to
6b3edf4
Compare
// Confirm that, if we have a max, held count isn't already there | ||
if e.maxConcurrent > 0 && e.heldSlotCount >= e.maxConcurrent { | ||
func (e *eagerActivityExecutor) reserveOnePendingSlot() *SlotPermit { | ||
// Confirm that, if we have a max, issued count isn't already there |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why was the previous logic not sufficient? It already handled the case where maxConcurrent
could be unset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the same logic - heldSlotCount
is gone now
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Brought this back per discussion
ffb37e1
to
69a7a34
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
None of my existing concerns remain or are blockers (should wait on @Quinn-With-Two-Ns though). Always a bit scary changing internal polling machinations, so goes without saying that tests will be very important. Part of me wonders if we should make a larger effort to make this all new code instead of changing existing code just so we could have some oh-no flag to flip back, but that would seem hard without a lot of just copied functions.
internal/internal_worker_base.go
Outdated
if err != nil { | ||
if !errors.Is(err, context.Canceled) { | ||
bw.logger.Error(fmt.Sprintf("Error while trying to reserve slot: %v", err)) | ||
reserveChan <- nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can deadlock if the worker never pulls from reserveChan
, because it is stopping. you need to select with ctx.Done(). I think it would be better to just retry the ReserveSlot
call in this goroutine rather then coordinating with the outside goroutine, but your call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it and turned out to not be much simpler. You need two loops now and it was giving me some nonobvious problems.
df3112d
to
82721df
Compare
Make sure PID unit test won't see NaN values
82721df
to
6242cae
Compare
What was changed
Refactor slot reservation behind interface, interface exposed experimentally
Added resource based autotuner
Why?
Parity with ongoing work in all other SDKs
Checklist
Closes
How was this tested:
Added lots of tests, will be stressing in omes.
Any docs updates needed?
Public docs to come in public preview