Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autobatch: thread-safe, debounce, max delay and implement Batching #180

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

MichaelMure
Copy link
Contributor

Revamp the venerable autobatch in different ways:

  • make it thread safe (test -race is happy)
  • deduplicate writes (avoid writing the same key multiple time), which make it act as a debounce
  • introduce a maximum delay before the write happen, to avoid keeping the pending writes in memory if the count trigger is not reached
  • implement the Batching interface for compatibility with more usecases

@MichaelMure MichaelMure force-pushed the autobatch-improvements branch 2 times, most recently from b3d30ea to e51f68e Compare October 13, 2021 16:06
Revamp the venerable autobatch in different ways:
- make it thread safe (test -race is happy)
- deduplicate writes (avoid writing the same key multiple time), which make it act as a debounce
- introduce a maximum delay before the write happen, to avoid keeping the pending writes in memory if the count trigger is not reached
- implement the Batching interface for compatibility with more usecases
@MichaelMure MichaelMure force-pushed the autobatch-improvements branch from e51f68e to 80b7406 Compare October 13, 2021 16:15
@aschmahmann aschmahmann requested a review from guseggert October 15, 2021 15:39
@aschmahmann
Copy link
Contributor

@whyrusleeping you might be interested in this.

// batches writes using the given Batching datastore. The maximum number of
// write before triggering a batch is given by maxWrite. The maximum delay
// before triggering a batch is given by maxDelay.
func NewAutoBatching(child ds.Batching, maxWrite int, maxDelay time.Duration) *Datastore {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking this does break compatibility, although it's an easy change and I'm not sure many people use it.
It would be possible to maintain the previous signature with maxDelay set to infinity and have a second constructor with the new parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use varargs functional opts here to a) preserve backwards compat and b) give us a way to extend this in the future? (I think backwards compat would also require disabling the async flushing if maxDelay=0)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That can be done but that would be a bit weird to keep maxWrite as a parameter and maxDelay as a functional arg.
Also, not sure how much time I can find to work on this.

Copy link
Contributor

@guseggert guseggert left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't comment on Query(), so will comment here. Is the intention of flushing in Query() before issuing the query, to make sure that there are no pending ops before sending the query? If so, then I think that's racy (it unlocks after flushing and re-locks before querying).

autobatch/autobatch.go Outdated Show resolved Hide resolved
var timer <-chan time.Time

write := func() {
timer = nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This leaks the timer until it fires, e.g. if there is a high rate of ops that hit the maxWrite threshold really fast, we could end up with a lot of concurrent no-op timers running. I think it'd be better to use a proper timer and stop it here.

@@ -164,5 +236,32 @@ func (d *Datastore) Close() error {
if err2 != nil {
return err2
}
close(d.exit)
close(d.newWrite)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this might be racy if the timer fires and prepares a batch, then this flushes and closes the underlying datastore, then the async writer tries to commit the batch but the underlying DS is now closed

Also this could return before the async writer finishes flushing things, which is surprising.

@@ -70,6 +72,9 @@ func TestFlushing(t *testing.T) {
t.Fatal(err)
}

// flushing is async so we can rely on having it happening immediately
time.Sleep(100 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we make these deterministic to avoid flaky tests? (stub out time fns, inject mock clock. etc)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a library you would recommend for that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@MichaelMure MichaelMure force-pushed the autobatch-improvements branch from 69e2e34 to 83ddbef Compare November 3, 2021 07:24
@MichaelMure
Copy link
Contributor Author

I can't comment on Query(), so will comment here. Is the intention of flushing in Query() before issuing the query, to make sure that there are no pending ops before sending the query? If so, then I think that's racy (it unlocks after flushing and re-locks before querying).

This is correct, but at the same time Query can be very long lived (I used some hours-long Query before) so I don't think blocking writes during a Query is reasonable.

@guseggert
Copy link
Contributor

I can't comment on Query(), so will comment here. Is the intention of flushing in Query() before issuing the query, to make sure that there are no pending ops before sending the query? If so, then I think that's racy (it unlocks after flushing and re-locks before querying).

This is correct, but at the same time Query can be very long lived (I used some hours-long Query before) so I don't think blocking writes during a Query is reasonable.

Ah that makes sense, thanks :)

@guseggert guseggert self-requested a review November 3, 2021 13:34
@BigLep BigLep marked this pull request as draft January 7, 2022 16:40
@BigLep
Copy link

BigLep commented Jan 7, 2022

@MichaelMure : I've converted this to a draft. If/once you've incorporated feedback, feel free to publish and we'll happily take a look.

@BigLep
Copy link

BigLep commented May 6, 2022

@MichaelMure : are you going to take this, or should we close this for now?

@MichaelMure
Copy link
Contributor Author

@BigLep I'm sure there is value in there but I ended up not using any of this in my project as things took a different turn. I suppose we can close.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
No open projects
Status: 🥞 Todo
Development

Successfully merging this pull request may close these issues.

4 participants