Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deadlock producing during shutdown #831

Closed
asg0451 opened this issue Oct 8, 2024 · 17 comments · Fixed by #832
Closed

Deadlock producing during shutdown #831

asg0451 opened this issue Oct 8, 2024 · 17 comments · Fixed by #832
Labels
bug Something isn't working has pr

Comments

@asg0451
Copy link
Contributor

asg0451 commented Oct 8, 2024

We have observed what looks like a deadlock in franz-go.
We are using the ProduceSync method, and when we cancel the passed-in context, the call blocks forever.

Client configuration:

kgo.DisableIdempotentWrite(),
kgo.SeedBrokers(bootstrapAddrs),
kgo.WithLogger(kgoLogAdapter{ctx: ctx}),
kgo.RecordPartitioner(newKgoChangefeedPartitioner()),
kgo.ProducerBatchMaxBytes(256 << 20), // 256MiB
kgo.BrokerMaxWriteBytes(1 << 30),     // 1GiB
kgo.AllowAutoTopicCreation(),
kgo.RecordRetries(5),
kgo.RequestRetries(5),
kgo.ProducerOnDataLossDetected(func(topic string, part int32) {
        log.Errorf(ctx, `kafka sink detected data loss for topic %s partition %d`, redact.SafeString(topic), redact.SafeInt(part))
}),

kgo version: v1.17.1

Of note is that the client seems to be busy looping attempting to connect to the kafka broker, which i guess went away:

// kgo is spamming the logs with this at like 4k logs/hr (only ~1/sec but still)
WARN unable to open connection to broker ‹addr›=‹redacted.servicebus.windows.net:9093› ‹broker›=‹seed_0› ‹err›=dial tcp: ‹lookup redacted.servicebus.windows.net on 127.0.0.53:53: no such host›
INFO metadata update triggered ‹why›=‹re-updating metadata due to err: unable to dial: dial tcp: lookup redacted.servicebus.windows.net on 127.0.0.53:53: no such host›
WARN unable to open connection to broker ‹addr›=‹redacted.servicebus.windows.net:9093› ‹broker›=‹0› ‹err›=dial tcp: ‹lookup cdcdrt.servicebus.windows.net on 127.0.0.53:53: no such host›

Some relevant call stacks:

// (1) ProduceSync call being blocked on waiting for the individual produce calls to finish
1 @ 0x4a014e 0x4b30e5 0x4b30b4 0x4d46c5 0x4e6348 0x4607d2f 0x48bf9a6 0x48bf8db 0x486608b 0x489e12a 0xf07f73 0xf07d06 0xf07ebe 0x489df9d 0x489dc14 0x489db3f 0x1cdf7f6 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4d46c4	sync.runtime_Semacquire+0x24										GOROOT/src/runtime/sema.go:62
#	0x4e6347	sync.(*WaitGroup).Wait+0x47										GOROOT/src/sync/waitgroup.go:116
#	0x4607d2e	github.com/twmb/franz-go/pkg/kgo.(*Client).ProduceSync+0x14e						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:263
#	0x48bf9a5	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush.func1+0x65		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:155
#	0x48bf8da	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush+0x9a			github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:181
#	0x486608a	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*batchingSink).runBatchingWorker.func1+0xea	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/batching_sink.go:353
#	0x489e129	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1.1+0x129		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:265
#	0xf07f72	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts.func1+0x12				github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:200
#	0xf07d05	github.com/cockroachdb/cockroach/pkg/util/retry.Options.Do+0x245					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:177
#	0xf07ebd	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts+0xdd					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:199
#	0x489df9c	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1+0x1dc		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:260
#	0x489dc13	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func2+0xb3		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:277
#	0x489db3e	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.Group.GoCtx.func6+0x1e	github.com/cockroachdb/cockroach/pkg/util/ctxgroup/ctxgroup.go:168
#	0x1cdf7f5	golang.org/x/sync/errgroup.(*Group).Go.func1+0x55							golang.org/x/sync/errgroup/external/org_golang_x_sync/errgroup/errgroup.go:78


// (59) stuck in condvar wait awaken
59 @ 0x4a014e 0x4b30e5 0x4b30b4 0x4d4785 0x4e4a3d 0x4e4872 0x4e2cf7 0x4609149 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4d4784	sync.runtime_SemacquireMutex+0x24				GOROOT/src/runtime/sema.go:77
#	0x4e4a3c	sync.(*Mutex).lockSlow+0x15c					GOROOT/src/sync/mutex.go:171
#	0x4e4871	sync.(*Mutex).Lock+0x31						GOROOT/src/sync/mutex.go:90
#	0x4e2cf6	sync.(*Cond).Wait+0x96						GOROOT/src/sync/cond.go:71
#	0x4609148	github.com/twmb/franz-go/pkg/kgo.(*Client).produce.func2+0xc8	github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:475


// (43) stuck in <-wait
43 @ 0x4a014e 0x468ddf 0x4689f2 0x4608f31 0x4608b54 0x4607d0e 0x4607cef 0x48bf9a6 0x48bf8db 0x486608b 0x489e12a 0xf07f73 0xf07d06 0xf07ebe 0x489df9d 0x489dc14 0x489db3f 0x1cdf7f6 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4608f30	github.com/twmb/franz-go/pkg/kgo.(*Client).produce.func3+0x110						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:487
#	0x4608b53	github.com/twmb/franz-go/pkg/kgo.(*Client).produce+0xb73						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:500
#	0x4607d0d	github.com/twmb/franz-go/pkg/kgo.(*Client).Produce+0x12d						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:379
#	0x4607cee	github.com/twmb/franz-go/pkg/kgo.(*Client).ProduceSync+0x10e						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:261
#	0x48bf9a5	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush.func1+0x65		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:155
#	0x48bf8da	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush+0x9a			github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:181
#	0x486608a	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*batchingSink).runBatchingWorker.func1+0xea	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/batching_sink.go:353
#	0x489e129	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1.1+0x129		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:265
#	0xf07f72	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts.func1+0x12				github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:200
#	0xf07d05	github.com/cockroachdb/cockroach/pkg/util/retry.Options.Do+0x245					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:177
#	0xf07ebd	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts+0xdd					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:199
#	0x489df9c	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1+0x1dc		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:260
#	0x489dc13	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func2+0xb3		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:277
#	0x489db3e	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.Group.GoCtx.func6+0x1e	github.com/cockroachdb/cockroach/pkg/util/ctxgroup/ctxgroup.go:168
#	0x1cdf7f5	golang.org/x/sync/errgroup.(*Group).Go.func1+0x55							golang.org/x/sync/errgroup/external/org_golang_x_sync/errgroup/errgroup.go:78

// (30) stuck in mu.Lock() in drainbuffered?
30 @ 0x4a014e 0x4b30e5 0x4b30b4 0x4d4785 0x4e4a3d 0x4608eb7 0x4608e85 0x4608b54 0x4607d0e 0x4607cef 0x48bf9a6 0x48bf8db 0x486608b 0x489e12a 0xf07f73 0xf07d06 0xf07ebe 0x489df9d 0x489dc14 0x489db3f 0x1cdf7f6 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4d4784	sync.runtime_SemacquireMutex+0x24									GOROOT/src/runtime/sema.go:77
#	0x4e4a3c	sync.(*Mutex).lockSlow+0x15c										GOROOT/src/sync/mutex.go:171
#	0x4608eb6	sync.(*Mutex).Lock+0x96											GOROOT/src/sync/mutex.go:90
#	0x4608e84	github.com/twmb/franz-go/pkg/kgo.(*Client).produce.func3+0x64						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:483
#	0x4608b53	github.com/twmb/franz-go/pkg/kgo.(*Client).produce+0xb73						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:500
#	0x4607d0d	github.com/twmb/franz-go/pkg/kgo.(*Client).Produce+0x12d						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:379
#	0x4607cee	github.com/twmb/franz-go/pkg/kgo.(*Client).ProduceSync+0x10e						github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:261
#	0x48bf9a5	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush.func1+0x65		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:155
#	0x48bf8da	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*kafkaSinkClientV2).Flush+0x9a			github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/sink_kafka_v2.go:181
#	0x486608a	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*batchingSink).runBatchingWorker.func1+0xea	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/batching_sink.go:353
#	0x489e129	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1.1+0x129		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:265
#	0xf07f72	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts.func1+0x12				github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:200
#	0xf07d05	github.com/cockroachdb/cockroach/pkg/util/retry.Options.Do+0x245					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:177
#	0xf07ebd	github.com/cockroachdb/cockroach/pkg/util/retry.WithMaxAttempts+0xdd					github.com/cockroachdb/cockroach/pkg/util/retry/retry.go:199
#	0x489df9c	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func1+0x1dc		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:260
#	0x489dc13	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.func2+0xb3		github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/parallel_io.go:277
#	0x489db3e	github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl.(*ParallelIO).processIO.Group.GoCtx.func6+0x1e	github.com/cockroachdb/cockroach/pkg/util/ctxgroup/ctxgroup.go:168
#	0x1cdf7f5	golang.org/x/sync/errgroup.(*Group).Go.func1+0x55							golang.org/x/sync/errgroup/external/org_golang_x_sync/errgroup/errgroup.go:78


// (13) stuck in mu.Lock() in the goroutine?
13 @ 0x4a014e 0x4b30e5 0x4b30b4 0x4d4785 0x4e4a3d 0x460911a 0x46090ff 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4d4784	sync.runtime_SemacquireMutex+0x24				GOROOT/src/runtime/sema.go:77
#	0x4e4a3c	sync.(*Mutex).lockSlow+0x15c					GOROOT/src/sync/mutex.go:171
#	0x4609119	sync.(*Mutex).Lock+0x99						GOROOT/src/sync/mutex.go:90
#	0x46090fe	github.com/twmb/franz-go/pkg/kgo.(*Client).produce.func2+0x7e	github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:472


// (1) waiting on lock in finishing promises? not sure
1 @ 0x4a014e 0x4b30e5 0x4b30b4 0x4d4785 0x4e4a3d 0x46098fd 0x46098ca 0x460971f 0x4d8d21
# labels: {"job":"CHANGEFEED id=1004661810832080897", "n":"5"}
#	0x4d4784	sync.runtime_SemacquireMutex+0x24					GOROOT/src/runtime/sema.go:77
#	0x4e4a3c	sync.(*Mutex).lockSlow+0x15c						GOROOT/src/sync/mutex.go:171
#	0x46098fc	sync.(*Mutex).Lock+0x13c						GOROOT/src/sync/mutex.go:90
#	0x46098c9	github.com/twmb/franz-go/pkg/kgo.(*Client).finishRecordPromise+0x109	github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:587
#	0x460971e	github.com/twmb/franz-go/pkg/kgo.(*producer).finishPromises+0x2be	github.com/twmb/franz-go/pkg/kgo/external/com_github_twmb_franz_go/pkg/kgo/producer.go:549

I'll keep looking at this but your assistance would be greatly appreciated

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 8, 2024

some thoughts:

this chan recv seems to be where we're mostly blocked

and it should be closed by this goroutine above

defer close(wait)

which is normally blocked on this condvar wait

p.c.Wait()

which is woken up right before the chan recv above

p.c.Broadcast() // wake the goroutine above

but the condvar wait is blocked on re-locking the mutex, per the stack trace above. so we are blocked on shutdown until whoever has that lock gives it up. who has it?

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 8, 2024

We now no longer believe that the kafka cluster's unavailability was related, as that was taken offline several days into this deadlock.

@twmb
Copy link
Owner

twmb commented Oct 8, 2024

I think I see the problem. The goroutine leaves the mutex locked (for reasons). If it runs and returns and closes wait and the record context is canceled or the client is closing, it's possible a different select case is chosen. The other select cases drainBuffered -- which tries to immediately lock.

I'll confirm if this is the problem, fix it, and add a test case for this.
If it's not the problem, well, I guess more staring.

@twmb
Copy link
Owner

twmb commented Oct 8, 2024

diff --git a/pkg/kgo/producer.go b/pkg/kgo/producer.go
index d9cca99..1ea2a29 100644
--- a/pkg/kgo/producer.go
+++ b/pkg/kgo/producer.go
@@ -480,12 +480,24 @@ func (cl *Client) produce(
                }()

                drainBuffered := func(err error) {
-                       p.mu.Lock()
-                       quit = true
+                       // The expected case here is that a context was
+                       // canceled while we we waiting for space, so we are
+                       // exiting and need to kill the goro above.
+                       //
+                       // However, it is possible that the goro above has
+                       // already exited AND the context was canceled, and
+                       // `select` chose the context-canceled case.
+                       //
+                       // So, to avoid a deadlock, we need to wakeup the
+                       // goro above in another goroutine.
+                       go func() {
+                               p.mu.Lock()
+                               quit = true
+                               p.mu.Unlock()
+                               p.c.Broadcast()
+                       }()
+                       <-wait // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
                        p.mu.Unlock()
-                       p.c.Broadcast() // wake the goroutine above
-                       <-wait
-                       p.mu.Unlock() // we wait for the goroutine to exit, then unlock again (since the goroutine leaves the mutex locked)
                        p.promiseRecordBeforeBuf(promisedRec{ctx, promise, r}, err)
                }

is what I'm thinking

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 8, 2024

thanks for the quick response @twmb! i'm trying to understand your proposed fix right now..

@twmb
Copy link
Owner

twmb commented Oct 8, 2024

tldr summary is:

  • original goroutine deliberately leaves the mutex locked
  • context-canceled select case is chosen, which also tries to lock specifically to wake up the original goroutine

We can wake up concurrently

@twmb twmb added the bug Something isn't working label Oct 8, 2024
@asg0451
Copy link
Contributor Author

asg0451 commented Oct 8, 2024

how does the spawned goroutine exit if quit is false? and quit can only be set true by drainbuffered, which is only called if ctx is already canceled. right?

@twmb
Copy link
Owner

twmb commented Oct 8, 2024

Standard case, overMaxRecs || overMaxBytes becomes false due to a record finishing.

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 8, 2024

I see, thanks. I think that makes sense.

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 10, 2024

@twmb do you have an estimate on how long it will take to fix this issue? I really appreciate your work on it -- just want to get an estimate so i can make decisions on how to move forward on our end. I'm working on a repro unit test in this repository currently as well, and i'll let you know if that bears fruit

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 10, 2024

@twmb i have a reliable repro unit test here: https://github.com/asg0451/franz-go/blob/7fe24ae1ac0bf8150a18c09459509d014a9f05e7/pkg/kgo/produce_request_test.go#L624C1-L724C2 - it usually fails after a matter of seconds.

significantly, this never fails when each worker thread has its own client. Produce/ProduceSync is supposed to be thread-safe, right?

EDIT: i applied the patch you posted above and the test seems to not be failing with it

@twmb
Copy link
Owner

twmb commented Oct 10, 2024

PR #832 also has a reliable reproducer -- I had this in a branch, sorry you put in the effort as well (the test in 832 is a bit smaller, hope it was fun to write 😅)

I'm going to work on this repo the rest of the day, trying to fix some other misc more minor problems, and tag either today or tomorrow. If I can get through the feature requests as well, I'll roll this in to a 1.18 release, if not, a patch for 1.17.

@asg0451
Copy link
Contributor Author

asg0451 commented Oct 10, 2024

no problem, it was kind of fun to write ;)

that's great to hear, thanks. i'll be watching this issue eagerly 👀

@twmb twmb closed this as completed in #832 Oct 10, 2024
@twmb twmb closed this as completed in 6a75940 Oct 10, 2024
@twmb
Copy link
Owner

twmb commented Oct 10, 2024

Merging my PR, but await the tag.

@twmb
Copy link
Owner

twmb commented Oct 14, 2024

ETA today. I'll reopen and then close one I push.

@twmb twmb reopened this Oct 14, 2024
@twmb twmb added the has pr label Oct 14, 2024
@twmb
Copy link
Owner

twmb commented Oct 15, 2024

Releasing this evening (merging PRs at the moment), closing now to ensure I'm addressing everything for the next release.

@twmb twmb closed this as completed Oct 15, 2024
@asg0451
Copy link
Contributor Author

asg0451 commented Oct 16, 2024

hey @twmb, i see a changelog entry for 1.18.0 -- thanks for getting this done!

ortuman pushed a commit to grafana/franz-go that referenced this issue Oct 17, 2024
Problem:
* Record A exceeds max, is on path to block
* Record B finishes concurrently
* Record A's context cancels
* Record A's goroutine waiting to be unblocked returns, leaves accounting mutex in locked state
* Record A's select statement chooses context-canceled case, trying to grab the accounting mutex lock

See twmb#831 for more details.

Closes twmb#831.
ortuman added a commit to grafana/franz-go that referenced this issue Oct 17, 2024
* Fix typo in kgo.Client.ResumeFetchTopics() docs

Signed-off-by: Mihai Todor <todormihai@gmail.com>

* add `NewOffsetFromRecord` helper function

* Fix typo in Record.ProducerID doc comment.

* Don't set nil config when seeding topics in kfake cluster

Setting the configs to `nil` causes it to panic later when trying to alter the topic configs, as it only checks for entry in the map not being present, not for it being nil

Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>

* Add Opts method for sr.Client

* Merge pull request twmb#826 from colega/don-t-set-nil-config-when-seeding-topics-in-kfake-cluster

Don't set nil config when seeding topics in kfake cluster

* Merge pull request twmb#821 from seizethedave/davidgrant/producer-doc

Fix typo in Record.ProducerID doc comment.

* Merge pull request twmb#812 from mihaitodor/fix-doc-typo

Fix typo in kgo.Client.ResumeFetchTopics() docs

* kgo: fix potential deadlock when reaching max buffered (records|bytes)

Problem:
* Record A exceeds max, is on path to block
* Record B finishes concurrently
* Record A's context cancels
* Record A's goroutine waiting to be unblocked returns, leaves accounting mutex in locked state
* Record A's select statement chooses context-canceled case, trying to grab the accounting mutex lock

See twmb#831 for more details.

Closes twmb#831.

* all: unlint what is now cropping up

gosec ones are delibate; govet ones are now randomly showing up (and
also deliberate)

* Merge pull request twmb#832 from twmb/831

kgo: fix potential deadlock when reaching max buffered (records|bytes)

* kgo: misc doc update

* kgo: ignore OOOSN where possible

See embedded comment. This preceeds handling KIP-890.

Closes twmb#805.

* kip-890 definitions

A bunch of version bumps to indicate TransactionAbortable is supported
as an error return.

* kip-848 more definitions

Added in Kafka 3.8:
* ListGroups.TypesFilter
* ConsumerGroupDescribe request

* kip-994 proto

Only ListTransactions was modified in 3.8

* sr: add StatusCode to ResponseError, and message if the body is empty

Closes twmb#819.

* generate / kmsg: update GroupMetadata{Key,Value}

Not much changed here.

Closes twmb#804.

* kgo: do not add all topics to internal tps map when regex consuming

The internal tps map is meant to be what we store topicPartitions in
that we are candidates to be consumed. This is filtered in
assignPartitions to only opt-in partitions that are actually being
consumed.

It's not BAD if we store all topics in that map, but it's not the
intent. The rest of the client worked fine even with extra topics in the
map.

When regex consuming, the metadata function previously put all topics
into the map always. Now, we move the regex evaluation logic --
duplicated in both the direct and group consumers -- into one function
and use that for filtering within metadata.

This introduces a required sequence of filtering THEN finding
assignments, which is fine / was the way things operated anyway.

Moving the filtering to metadata (only in the regex consuming logic)
means that we no longer store information for topics we are not
consuming. Indirectly, this fixes a bug where `GetConsumeTopics` would
always return ALL topics when regex consuming, because
`GetConsumeTopics` always just returned what was in the `tps` field.

This adds a test for the fixed behavior, as well as tests that NOT regex
consuming always returns all topics the user is interested in.

Closes twmb#810.

* Merge pull request twmb#833 from twmb/proto-3.8.0

Proto 3.8.0

* kgo: support Kafka 3.8's kip-890 modifications

STILL NOT ALL OF KIP-890, despite what I originally coded.
Kafka 3.8 only added support for TransactionAbortable.
Producers still need to send AddPartitionsToTxn.

* kversion: note kip-848 additions for kafka 3.8

* kversion: note kip-994 added in 3.8, finalize 3.8

* kversion: ignore API keys 74,75 when guessing versions

These are in Kraft only, and are two requests from two separate KIPs
that aren't fully supported yet. Not sure why only these two were
stabilized.

* README: note 3.8 KIPs

* kgo: bump kmsg pinned dep

* Merge pull request twmb#840 from twmb/kafka-3.8.0

Kafka 3.8.0

* Merge pull request twmb#760 from twmb/753

kgo: add AllowRebalance and CloseAllowingRebalance to GroupTransactSession

* Merge pull request twmb#789 from sbuliarca/errgroupsession-export-err

kgo: export the wrapped error from ErrGroupSession

* Merge pull request twmb#794 from twmb/790

kgo: add TopicID to the FetchTopic type

* Merge pull request twmb#814 from noamcohen97/new-offset-helper

kadm: add `NewOffsetFromRecord` helper function

* Merge pull request twmb#829 from andrewstucki/sr-client-opts

Add Opts method for sr.Client

* Merge pull request twmb#834 from twmb/805

kgo: ignore OOOSN where possible

* Merge pull request twmb#835 from twmb/819

sr: add StatusCode to ResponseError, and message if the body is empty

* Merge pull request twmb#838 from twmb/810

kgo: do not add all topics to internal tps map when regex consuming

* CHANGELOG: note incoming release

* Merge pull request twmb#841 from twmb/1.18-changelog

CHANGELOG: note incoming release

* pkg/sr: require go 1.22

No real reason, no real reason not to. This also allows one commit after
the top level franz tag.

* Merge pull request twmb#842 from twmb/sr-1.22

pkg/sr: require go 1.22

* pkg/kadm: bump go deps

* Merge pull request twmb#843 from twmb/kadm

pkg/kadm: bump go deps

---------

Signed-off-by: Mihai Todor <todormihai@gmail.com>
Signed-off-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Mihai Todor <todormihai@gmail.com>
Co-authored-by: Noam Cohen <noam@noam.me>
Co-authored-by: David Grant <seizethedave@gmail.com>
Co-authored-by: Oleg Zaytsev <mail@olegzaytsev.com>
Co-authored-by: Andrew Stucki <andrew.stucki@redpanda.com>
Co-authored-by: Travis Bischel <travis.bischel+github@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working has pr
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants