Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch completer + additional completer test suite and benchmarks #258

Merged
merged 1 commit into from
Mar 17, 2024

Conversation

brandur
Copy link
Contributor

@brandur brandur commented Mar 9, 2024

Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the riverdriver interface,
the completer only batches jobs being set to completed, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member AsyncCompleter, thereby allowing
the BatchCompleter to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than AsyncCompleter (the one currently in use by River
client), and 10-15x faster than InlineCompleter.

$ go test -bench=. ./internal/jobcompleter
goos: darwin
goarch: arm64
pkg: github.com/riverqueue/river/internal/jobcompleter
BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
PASS
ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.

@brandur brandur requested a review from bgentry March 9, 2024 22:35
@brandur brandur force-pushed the brandur-fast-completer branch from 9e5e7d2 to 84aa1f0 Compare March 9, 2024 22:37
@brandur
Copy link
Contributor Author

brandur commented Mar 9, 2024

@bgentry Hoping we can bring in #254 so I can run an end-to-end benchmark on this, but results from more synthetic benchmarks are currently looking very good. I suspect that the completer is currently our bottleneck because even with the async completer, it's easy for all 100 errgroup slots to become saturated, and once they do, completion probably isn't that much faster than just completing inline.

@brandur brandur force-pushed the brandur-fast-completer branch 2 times, most recently from 019b989 to 8207832 Compare March 9, 2024 22:41
@brandur
Copy link
Contributor Author

brandur commented Mar 10, 2024

Actually, I ended up hacking my benchmarker into this branch. 5x+ improvement at >35k jobs/sec on my commodity MBA (compared to 6k jobs/sec previously).

Before:

$ go run main.go bench --database-url $DATABASE_URL --duration 1m
bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
bench: jobs worked [       3634 ], inserted [       4000 ], job/sec [     1817.0 ] [2s]
bench: jobs worked [       4230 ], inserted [       4000 ], job/sec [     2115.0 ] [2s]
bench: jobs worked [       4238 ], inserted [       4000 ], job/sec [     2119.0 ] [2s]
bench: jobs worked [       4309 ], inserted [       6000 ], job/sec [     2154.5 ] [2s]
bench: jobs worked [       4251 ], inserted [       4000 ], job/sec [     2125.5 ] [2s]
bench: jobs worked [       4246 ], inserted [       4000 ], job/sec [     2123.0 ] [2s]
bench: jobs worked [       4202 ], inserted [       4000 ], job/sec [     2101.0 ] [2s]
bench: jobs worked [       3982 ], inserted [       4000 ], job/sec [     1991.0 ] [2s]
bench: jobs worked [       4055 ], inserted [       4000 ], job/sec [     2027.5 ] [2s]
bench: jobs worked [       4060 ], inserted [       4000 ], job/sec [     2030.0 ] [2s]
bench: jobs worked [       4100 ], inserted [       4000 ], job/sec [     2050.0 ] [2s]
bench: jobs worked [       3985 ], inserted [       4000 ], job/sec [     1992.5 ] [2s]
bench: jobs worked [       4002 ], inserted [       4000 ], job/sec [     2001.0 ] [2s]
bench: jobs worked [       4016 ], inserted [       4000 ], job/sec [     2008.0 ] [2s]
bench: jobs worked [       3834 ], inserted [       4000 ], job/sec [     1917.0 ] [2s]
bench: jobs worked [       3775 ], inserted [       4000 ], job/sec [     1887.5 ] [2s]
bench: jobs worked [       3815 ], inserted [       4000 ], job/sec [     1907.5 ] [2s]
bench: jobs worked [       3617 ], inserted [       4000 ], job/sec [     1808.5 ] [2s]
bench: jobs worked [       9917 ], inserted [      10000 ], job/sec [     4958.5 ] [2s]
bench: jobs worked [       7186 ], inserted [       6000 ], job/sec [     3593.0 ] [2s]
bench: jobs worked [      25564 ], inserted [      26000 ], job/sec [    12782.0 ] [2s]
bench: jobs worked [      24033 ], inserted [      24000 ], job/sec [    12016.5 ] [2s]
bench: jobs worked [      27612 ], inserted [      26000 ], job/sec [    13806.0 ] [2s]
bench: jobs worked [      26952 ], inserted [      28000 ], job/sec [    13476.0 ] [2s]
bench: jobs worked [      27083 ], inserted [      28000 ], job/sec [    13541.5 ] [2s]
bench: jobs worked [      27501 ], inserted [      26000 ], job/sec [    13750.5 ] [2s]
bench: jobs worked [      27636 ], inserted [      28000 ], job/sec [    13818.0 ] [2s]
bench: jobs worked [      27914 ], inserted [      28000 ], job/sec [    13957.0 ] [2s]
bench: jobs worked [      28998 ], inserted [      30000 ], job/sec [    14499.0 ] [2s]
bench: total jobs worked [     366572 ], total jobs inserted [     416000 ], overall job/sec [     6109.5 ], running 1m0.00017125s

After:

$ go run main.go bench --database-url $DATABASE_URL --duration 1m
bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
bench: jobs worked [      50005 ], inserted [      52000 ], job/sec [    25002.5 ] [2s]
bench: jobs worked [      79998 ], inserted [      74000 ], job/sec [    39999.0 ] [2s]
bench: jobs worked [      70013 ], inserted [      76000 ], job/sec [    35006.5 ] [2s]
bench: jobs worked [      92621 ], inserted [      88000 ], job/sec [    46310.5 ] [2s]
bench: jobs worked [      87398 ], inserted [      86000 ], job/sec [    43699.0 ] [2s]
bench: jobs worked [      61233 ], inserted [      66000 ], job/sec [    30616.5 ] [2s]
bench: jobs worked [      90732 ], inserted [      76000 ], job/sec [    45366.0 ] [2s]
bench: jobs worked [      62004 ], inserted [      78000 ], job/sec [    31002.0 ] [2s]
bench: jobs worked [      93999 ], inserted [      86000 ], job/sec [    46999.5 ] [2s]
bench: jobs worked [      83998 ], inserted [      82000 ], job/sec [    41999.0 ] [2s]
bench: jobs worked [      73999 ], inserted [      78000 ], job/sec [    36999.5 ] [2s]
bench: jobs worked [      71581 ], inserted [      76000 ], job/sec [    35790.5 ] [2s]
bench: jobs worked [      76939 ], inserted [      78000 ], job/sec [    38469.5 ] [2s]
bench: jobs worked [      73105 ], inserted [      68000 ], job/sec [    36552.5 ] [2s]
bench: jobs worked [      80354 ], inserted [      86000 ], job/sec [    40177.0 ] [2s]
bench: jobs worked [      70004 ], inserted [      60000 ], job/sec [    35002.0 ] [2s]
bench: jobs worked [      70011 ], inserted [      76000 ], job/sec [    35005.5 ] [2s]
bench: jobs worked [      84107 ], inserted [      84000 ], job/sec [    42053.5 ] [2s]
bench: jobs worked [      65873 ], inserted [      70000 ], job/sec [    32936.5 ] [2s]
bench: jobs worked [      80005 ], inserted [      72000 ], job/sec [    40002.5 ] [2s]
bench: jobs worked [      70009 ], inserted [      74000 ], job/sec [    35004.5 ] [2s]
bench: jobs worked [      80015 ], inserted [      78000 ], job/sec [    40007.5 ] [2s]
bench: jobs worked [      70016 ], inserted [      76000 ], job/sec [    35008.0 ] [2s]
bench: jobs worked [      69993 ], inserted [      70000 ], job/sec [    34996.5 ] [2s]
bench: jobs worked [      80020 ], inserted [      80000 ], job/sec [    40010.0 ] [2s]
bench: jobs worked [      90008 ], inserted [      80000 ], job/sec [    45004.0 ] [2s]
bench: jobs worked [      60011 ], inserted [      70000 ], job/sec [    30005.5 ] [2s]
bench: jobs worked [      70566 ], inserted [      70000 ], job/sec [    35283.0 ] [2s]
bench: jobs worked [      79411 ], inserted [      80000 ], job/sec [    39705.5 ] [2s]
bench: total jobs worked [    2268151 ], total jobs inserted [    2314000 ], overall job/sec [    37802.2 ], running 1m0.000476583s

@brandur brandur force-pushed the brandur-fast-completer branch 2 times, most recently from c62aa0a to e9ddb3c Compare March 10, 2024 01:06
event.go Outdated
@@ -69,7 +69,7 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics {

// The maximum size of the subscribe channel. Events that would overflow it will
// be dropped.
const subscribeChanSize = 100
const subscribeChanSize = 50_000
Copy link
Contributor Author

@brandur brandur Mar 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Had to substantially increase this number of the benchmarker since it's now possible to insert 100 jobs to the channel almost instantaneously and even a fast channel listener can lose some.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering earlier if this would be an issue with the way you were now measuring. It’s unlikely someone would hit this in practice but I suppose it’s still a bit of a wart with the current fixed size buffered channel for subscriptions.

@brandur brandur force-pushed the brandur-fast-completer branch from e9ddb3c to 3a85805 Compare March 10, 2024 04:04
@brandur
Copy link
Contributor Author

brandur commented Mar 10, 2024

Benchmark settings tweaks discussed in #254 improved these numbers even more — now regularly seeing > 40k jobs/sec with an average ~37k jobs/sec.

@brandur brandur force-pushed the brandur-fast-completer branch from 3a85805 to 3974581 Compare March 10, 2024 04:16
@brandur
Copy link
Contributor Author

brandur commented Mar 10, 2024

With the tweaks from #259 these got even better, showing >50k jobs/sec, although oddly the longer the benchmark goes on, the more degraded the numbers become, and this is reproducible. I'm fairly sure that's not related to the completer changes here, but something to take a look into as we try to optimize. Also, the fact that some of these iterations performed exactly 50k jobs/sec is also suspicious and needs more investigation.

$ go run main.go bench --database-url $DATABASE_URL --duration 1m
bench: jobs worked [          0 ], inserted [      50000 ], job/sec [        0.0 ] [0s]
bench: jobs worked [      82080 ], inserted [      82000 ], job/sec [    41040.0 ] [2s]
bench: jobs worked [      89920 ], inserted [      90000 ], job/sec [    44960.0 ] [2s]
bench: jobs worked [     100000 ], inserted [      92000 ], job/sec [    50000.0 ] [2s]
bench: jobs worked [      99034 ], inserted [      94000 ], job/sec [    49517.0 ] [2s]
bench: jobs worked [      74966 ], inserted [      88000 ], job/sec [    37483.0 ] [2s]
bench: jobs worked [     100000 ], inserted [     100000 ], job/sec [    50000.0 ] [2s]
bench: jobs worked [      96000 ], inserted [      96000 ], job/sec [    48000.0 ] [2s]
bench: jobs worked [     109049 ], inserted [      94000 ], job/sec [    54524.5 ] [2s]
bench: jobs worked [      70065 ], inserted [      86000 ], job/sec [    35032.5 ] [2s]
bench: jobs worked [      73861 ], inserted [      74000 ], job/sec [    36930.5 ] [2s]
bench: jobs worked [      75025 ], inserted [      74000 ], job/sec [    37512.5 ] [2s]
bench: jobs worked [      74023 ], inserted [      76000 ], job/sec [    37011.5 ] [2s]
bench: jobs worked [      49014 ], inserted [      48000 ], job/sec [    24507.0 ] [2s]
bench: jobs worked [      94971 ], inserted [      94000 ], job/sec [    47485.5 ] [2s]
bench: jobs worked [      59105 ], inserted [      60000 ], job/sec [    29552.5 ] [2s]
bench: jobs worked [      55060 ], inserted [      52000 ], job/sec [    27530.0 ] [2s]
bench: jobs worked [      48911 ], inserted [      50000 ], job/sec [    24455.5 ] [2s]
bench: jobs worked [      47193 ], inserted [      48000 ], job/sec [    23596.5 ] [2s]
bench: jobs worked [      45858 ], inserted [      46000 ], job/sec [    22929.0 ] [2s]
bench: jobs worked [      41228 ], inserted [      42000 ], job/sec [    20614.0 ] [2s]
bench: jobs worked [      35031 ], inserted [      34000 ], job/sec [    17515.5 ] [2s]
bench: jobs worked [      29986 ], inserted [      30000 ], job/sec [    14993.0 ] [2s]
bench: jobs worked [      29058 ], inserted [      28000 ], job/sec [    14529.0 ] [2s]
bench: jobs worked [      27033 ], inserted [      30000 ], job/sec [    13516.5 ] [2s]
bench: jobs worked [      29033 ], inserted [      28000 ], job/sec [    14516.5 ] [2s]
bench: jobs worked [      26030 ], inserted [      24000 ], job/sec [    13015.0 ] [2s]
bench: jobs worked [      28020 ], inserted [      30000 ], job/sec [    14010.0 ] [2s]
bench: jobs worked [      25026 ], inserted [      26000 ], job/sec [    12513.0 ] [2s]
bench: jobs worked [      27019 ], inserted [      26000 ], job/sec [    13509.5 ] [2s]
bench: total jobs worked [    1767497 ], total jobs inserted [    1818000 ], overall job/sec [    29457.5 ], running 1m0.001600541s

@bgentry
Copy link
Contributor

bgentry commented Mar 10, 2024

1 / .02 * 1000 = 50,000, the exact max theoretical throughput when you’re fetching 1000 jobs every 2ms 😁

@brandur
Copy link
Contributor Author

brandur commented Mar 10, 2024

1 / .02 * 1000 = 50,000, the exact max theoretical throughput when you’re fetching 1000 jobs every 2ms 😁

Ah dang it. Good call :)

@brandur brandur force-pushed the brandur-fast-completer branch from 3974581 to df7af43 Compare March 10, 2024 16:55
event.go Outdated
@@ -69,7 +69,7 @@ func jobStatisticsFromInternal(stats *jobstats.JobStatistics) *JobStatistics {

// The maximum size of the subscribe channel. Events that would overflow it will
// be dropped.
const subscribeChanSize = 100
const subscribeChanSizeDefault = 50_000
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so unfortunately, it seems that Go eagerly allocates channel size [1], and this 50k number is probably way more than most uses will ever need. However, we do need a number at least this high for benchmarking (50k is only ~1s worth of benching which you could imagine falling behind).

@bgentry Thoughts on possibly exposing a second client.Subscribe variant that allows a channel size to be injected? (Or actually, it might take a SubscribeConfig object just for better future compatibility.) Not perfect, but it may be the best option.

I already had to add an internal subscribe variant so I could fix the test case that checked that subscribe channels can overflow.


[1] https://stackoverflow.com/questions/65417280/how-and-when-does-go-allocate-memory-for-bounded-queue-channels

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, I went ahead and did this (add a second Subscribe function that takes SubscribeConfig). Feels pretty clean IMO and resolves this problem. Also bumped the default size to 1_000 which feels a little more reasonable.

brandur added a commit that referenced this pull request Mar 10, 2024
Here, fix an intermittent stress test in the notifier package. This is
one that I added last minute in #258 in response to code review, so I
missed the intermittency because it didn't fail on any of the last
pushes that I made, but I noticed it failing today [1].

Luckily, it's quite a simple where we just have to make sure that the
test waits for one of the test goroutines to return before quitting. Ran
at `-race -count 1000` locally and no longer seeing any intermittency.

[1] https://github.com/riverqueue/river/actions/runs/8223455278/job/22486174354
brandur added a commit that referenced this pull request Mar 10, 2024
Here, fix an intermittent stress test in the notifier package. This is
one that I added last minute in #258 in response to code review, so I
missed the intermittency because it didn't fail on any of the last
pushes that I made, but I noticed it failing today [1].

Luckily, it's quite a simple where we just have to make sure that the
test waits for one of the test goroutines to return before quitting. Ran
at `-race -count 1000` locally and no longer seeing any intermittency.

[1] https://github.com/riverqueue/river/actions/runs/8223455278/job/22486174354
bgentry pushed a commit that referenced this pull request Mar 11, 2024
Here, fix an intermittent stress test in the notifier package. This is
one that I added last minute in #258 in response to code review, so I
missed the intermittency because it didn't fail on any of the last
pushes that I made, but I noticed it failing today [1].

Luckily, it's quite a simple where we just have to make sure that the
test waits for one of the test goroutines to return before quitting. Ran
at `-race -count 1000` locally and no longer seeing any intermittency.

[1] https://github.com/riverqueue/river/actions/runs/8223455278/job/22486174354
@brandur brandur force-pushed the brandur-fast-completer branch 2 times, most recently from a46afee to 2e74e37 Compare March 12, 2024 00:23
@@ -2,19 +2,20 @@ module github.com/riverqueue/river/cmd/river

go 1.21.4

// replace github.com/riverqueue/river => ../..
replace github.com/riverqueue/river => ../..
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have to temporarily uncomment the replace because the River API changed. I'll unwind this as I'm cutting a release.

@brandur brandur force-pushed the brandur-fast-completer branch 6 times, most recently from b7480ad to 0b783bd Compare March 12, 2024 01:56
// will be used to stop fetching new work whenever stop is initiated, or
// when the context provided to Run is itself cancelled.
fetchNewWorkCancel context.CancelCauseFunc
fetchWorkCancel context.CancelCauseFunc
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was alternatively named fetchWork versus fetchNewWork depending on the place, so here just standardized on the former since it's a little more succinct and the naming still describes it nicely.

@@ -23,10 +24,9 @@ require (
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgservicefile v0.0.0-20221227161230-091c0ba34f0a // indirect
github.com/jackc/puddle/v2 v2.2.1 // indirect
github.com/oklog/ulid/v2 v2.1.0 // indirect
github.com/lmittmann/tint v1.0.4 // indirect
Copy link
Contributor Author

@brandur brandur Mar 16, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ... I brought in this package for use with the benchmarker, which is a colorized log handler for slog. I was debugging some benchmark problems and this makes things far easier to read because the default Go output is such absolute garbage.

The nice thing about Tint is that it itself has zero dependencies, and we only need it for the River CLI package without having to add it to the top level one, so I think it's okay.

Here's a screenshot on the benchmarker running in verbose (I also added a --debug that makes it even more verbose):

image

Here's roughly the same on normal slog:

image

Comparisons:

  • Slog's timestamps are just absurdly verbose, and combined with the needless level= and msg= for every single trace, it takes half the screen right there.
  • Slog using the full level like INFO or DEBUG means that the traces don't align well because levels of are different length.
  • In slog, no visual distinction between messages, keys, and values. Makes picking things out quickly really hard.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks solid! I think I'm gonna pull this into my riverdemo app too.

brandur added a commit that referenced this pull request Mar 16, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.
brandur added a commit that referenced this pull request Mar 16, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.
brandur added a commit that referenced this pull request Mar 16, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions for those trying to do dump the full
schema until now:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions for those trying to do dump the full
schema until now:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions for those trying to do dump the full
schema until now:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

It can also dump _all_ migrations, which will be useful in cases where
users want to avoid River's internal migration framework completely, and
use their own:

    river migrate-get --all --exclude-version 1 --up > river_all.up.sql
    river migrate-get --all --exclude-version 1 --down > river_all.down.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

It can also dump _all_ migrations, which will be useful in cases where
users want to avoid River's internal migration framework completely, and
use their own:

    river migrate-get --all --exclude-version 1 --up > river_all.up.sql
    river migrate-get --all --exclude-version 1 --down > river_all.down.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
Copy link
Contributor

@bgentry bgentry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the slow review, but this looks great. I had just a couple questions and a concern about leaking a pgx-specific puddle error outside the driver abstraction.

I would also like to talk about how we can extend this to support other job states. For example, snooze seems like a good candidate to make use of this optimization as well. We can do that out of band from this PR though!

client.go Outdated

for _, service := range c.services {
// TODO(brandur): Reevaluate the use of fetchNewWorkCtx here. It's
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed the rename in this comment:

Suggested change
// TODO(brandur): Reevaluate the use of fetchNewWorkCtx here. It's
// TODO(brandur): Reevaluate the use of fetchCtx here. It's

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah good catch, thx.

@@ -94,17 +96,17 @@ func (b *Benchmarker[TTx]) Run(ctx context.Context) error {
// values against the wall, they perform quite well. Much better than
// the client's default values at any rate.
FetchCooldown: 2 * time.Millisecond,
FetchPollInterval: 5 * time.Millisecond,
FetchPollInterval: 20 * time.Millisecond,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

did you find that this gave you better throughput at 20ms vs 5ms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ... I still wouldn't commit to any particular numbers because I can see fairly wild swings in performance between runs, but I played with these numbers more and I think these ones gave me slightly better results.

Comment on lines 252 to 255
// New jobs to complete may have come in while working the batch
// above. If enough have to bring us above the minimum complete
// threshold, do another again. Otherwise, break and listen for
// a new tick.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bit of awkward phrasing in here, probably worth tweaking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Comment on lines 446 to 448
// As configued, total time from initial attempt is ~7 seconds (1 + 2 + 4) (not
// including jitter).
const numRetries = 3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may want to bump this a bit. 7 seconds could be just a short connectivity blip, so it seems a bit extreme to potentially allow bulk data loss in that situation.

Is there harm from increasing it? Of course the client won't be able to keep working new jobs if it hits the max backlog because we're stuck retrying, but it probably won't be able to do that anyway in a scenario that causes these failures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ugh, so this is really tricky to get right. It's 7 seconds, but if the loop is timing out every time (timeouts being a likely failure candidate), the total time is more like 37 seconds (7 seconds + 3 * 10 seconds), which is a really long time. I modified the comment above to be more explicit about that case.

This can be bad because the completer can still block a client shutdown, and blocking it for this length of time could be a serious problem. I've actually seen this happen to slightly smaller extents when a bad failure happens in the completer running a benchmark. I'll hit Ctrl+C to stop it, and it takes ~10 seconds to exit.

I think I'm going to punt on more changes on this in favor of a separate project thinking more about these retries. I'm not convinced that we're doing the right thing at all when it comes to this stuff. I think retrying once or maybe twice makes a certain amount of sense, but given these are likely indicative of something very degenerate happening in the DB, continuing to beat a dead horse by retrying over and over again is likely to lead to more failures and more unnecessary database contention. Even the retry schedule is probably wrong — the first retry should probably be happening even sooner after an initial failure because in the event of a truly intermittent failure, even 1s is quite a long time.

}

// A closed pool will never succeed, return immediately.
if errors.Is(err, puddle.ErrClosedPool) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is somewhat pgx-specific, isn't it? Do we need a driver error type for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I pushed this down a layer into the driver and added a test.

@brandur brandur force-pushed the brandur-fast-completer branch 2 times, most recently from 148fb15 to e5d4268 Compare March 17, 2024 20:19
@brandur
Copy link
Contributor Author

brandur commented Mar 17, 2024

I would also like to talk about how we can extend this to support other job states. For example, snooze seems like a good candidate to make use of this optimization as well. We can do that out of band from this PR though!

Awesome. Yeah, I think adding support for other states would be great, and actually, probably reasonably doable even from a single query if we put our mind to it. This PR's touched files became more widespread than I would've liked though, so anxious to get it in. +1 looking into the other states out of band.

Here, add a new completer using a completion strategy designed to be
much faster than what we're doing right now. Rather than blindly
throwing completion work into goroutine slots, it accumulates "batches"
of completions to be carried out, and using a debounced channel to fire
periodically (currently, up to every 100 milliseconds) and submit entire
batches for completion at once up to 2,000 jobs.

For the purposes of not grossly expanding the `riverdriver` interface,
the completer only batches jobs being set to `completed`, which under
most normal workloads we expect to be the vast common case. Jobs going
to other states are fed into a member `AsyncCompleter`, thereby allowing
the `BatchCompleter` to keeps implementation quite simple.

According to in-package benchmarking, the new completer is in the range
of 3-5x faster than `AsyncCompleter` (the one currently in use by River
client), and 10-15x faster than `InlineCompleter`.

    $ go test -bench=. ./internal/jobcompleter
    goos: darwin
    goarch: arm64
    pkg: github.com/riverqueue/river/internal/jobcompleter
    BenchmarkAsyncCompleter_Concurrency10/Completion-8                 10851            112318 ns/op
    BenchmarkAsyncCompleter_Concurrency10/RotatingStates-8             11386            120706 ns/op
    BenchmarkAsyncCompleter_Concurrency100/Completion-8                 9763            116773 ns/op
    BenchmarkAsyncCompleter_Concurrency100/RotatingStates-8            10884            115718 ns/op
    BenchmarkBatchCompleter/Completion-8                               54916             27314 ns/op
    BenchmarkBatchCompleter/RotatingStates-8                           11518            100997 ns/op
    BenchmarkInlineCompleter/Completion-8                               4656            369281 ns/op
    BenchmarkInlineCompleter/RotatingStates-8                           1561            794136 ns/op
    PASS
    ok      github.com/riverqueue/river/internal/jobcompleter       21.123s

Along with the new completer, we also add a vastly more thorough test
suite to help tease out race conditions and test edges that were
previously being ignored completely. For most cases we drop the heavy
mocking that was happening before, which was having the effect of
minimizing the surface area under test, and producing misleading timing
that wasn't realistic.

Similarly, we bring in a new benchmark framework to allow us to easily
vet and compare completer implementations relative to each other. The
expectation is that this will act as a more synthetic proxy, with the
new benchmarking tool in #254 providing a more realistic end-to-end
measurement.
@brandur brandur force-pushed the brandur-fast-completer branch from e5d4268 to 71abf47 Compare March 17, 2024 20:22
@brandur
Copy link
Contributor Author

brandur commented Mar 17, 2024

Thanks!

@brandur brandur merged commit 702d5b2 into master Mar 17, 2024
10 checks passed
@brandur brandur deleted the brandur-fast-completer branch March 17, 2024 20:25
brandur added a commit that referenced this pull request Mar 17, 2024
… options

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

It can also dump _all_ migrations, which will be useful in cases where
users want to avoid River's internal migration framework completely, and
use their own:

    river migrate-get --all --exclude-version 1 --up > river_all.up.sql
    river migrate-get --all --exclude-version 1 --down > river_all.down.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
brandur added a commit that referenced this pull request Mar 17, 2024
… options (#273)

This one's in pursuit of resolving #209, which I've decided to tackle
now because we're going to have to cut a CLI release across River
package versions for #258 anyway, so this'll reuse some work.

A new `river migrate-get` command becomes available, whose only job is
to dump SQL from River migrations so that it can easily be plugged into
other migration frameworks. Its use looks like:

    river migrate-get --version 3 --down > version3.down.sql
    river migrate-get --version 3 --up > version3.up.sql

It can also take multiple versions:

    river migrate-get --version 3,2,1 --down > river.down.sql
    river migrate-get --version 1,2.3 --up > river.up.sql

It can also dump _all_ migrations, which will be useful in cases where
users want to avoid River's internal migration framework completely, and
use their own:

    river migrate-get --all --exclude-version 1 --up > river_all.up.sql
    river migrate-get --all --exclude-version 1 --down > river_all.down.sql

Along with that, `migrate-down` and `migrate-up` get a few new useful
options:

* `--dry-run`: Prints information on migrations that would be run, but
  doesn't modify the database in any way.

* `--show-sql`: Prints SQL for each migration step that was applied.

This gives users an easy way to, after a River upgrade, run the CLI to
see what commands would be run were they to migrate, but without
actually performing the migration, likely a step that most production
users would perform to be cautious:

    river migrate-up --dry-run --show-sql

I've also done a little cleanup around the River CLI's `main.go`. The
`--verbose` and `--debug` commands added in #258 are now promoted to
persistent flag configuration so they're available for all commands, and
we now have one standardized way of initializing an appropriate logger.

Fixes #209.
bgentry added a commit that referenced this pull request Jun 26, 2024
Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.
bgentry added a commit that referenced this pull request Jun 26, 2024
Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.
bgentry added a commit that referenced this pull request Jun 26, 2024
Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.
bgentry added a commit that referenced this pull request Jun 26, 2024
* add failing test case for completer shutdown panic

This failing test case exposes the issue in #400 100% of the time, which
is caused by the `stopProducers()` call not actually waiting until the
producers are fully shut down before proceeding with the remaining
shutdown.

* fix shutdown panics by separating completer context

Back in #258 / 702d5b2, the batch completer was added to improve
throughput. As part of that refactor, it was turned into a startstop
service that took a context on start. We took the care to ensure that
the context provided to the completer was _not_ the `fetchCtx`
(cancelled on `Stop()`) but instead was the raw user-provided `ctx`,
specifically to make sure the completer could finish its work even after
fetches were stopped.

This worked well if the whole shutdown process was done with `Stop` /
`StopAndCancel`, but it did not work if the user-provided context was
itself cancelled outside of River. In that scenario, the completer would
immediately begin shutting down upon cancellation, even without waiting
for producers to finish sending it any final jobs that needed to be
recorded. This went unnoticed until #379 / 0e57338 turned this scenario
into a panic instead of a silent misbehavior, which is what was
encountered in #400.

To fix this situation, we need to use Go 1.21's new
`context.WithoutCancel` API to fork the user-provided context so that we
maintain whatever else is stored in there (i.e. so anything used by slog
is still available) but we do not cancel this completer's context
_ever_. The completer will manage its own shutdown when its `Stop()` is
called as part of all of the other client services being stopped in
parallel.

Fixes #400.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants