Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CommitRecords and commitOffsetsSync blocked #668

Closed
vrischmann opened this issue Jan 29, 2024 · 3 comments
Closed

CommitRecords and commitOffsetsSync blocked #668

vrischmann opened this issue Jan 29, 2024 · 3 comments
Labels
bug Something isn't working

Comments

@vrischmann
Copy link

vrischmann commented Jan 29, 2024

Hi,

we had an issue with one of our application where for some reason CommitRecords is blocked which leads to AllowRebalance not being called, which in turn leads to more problems.

Here is a goroutine dump from an application that's blocked right now: goroutine_dump.txt.

Unfortunately I don't have a reproducer available right now, I could try to make one if that's needed.

Our code works like this essentially:

package main

import (
	"context"
	"log"

	"github.com/twmb/franz-go/pkg/kgo"
)

func process(records []*kgo.Record) error {
	return nil
}

func doOnePoll(client *kgo.Client) error {
	fetches := client.PollRecords(context.Background(), 2000)
	defer client.AllowRebalance()

	if err := fetches.Err0(); err != nil {
		return err
	}

	records := fetches.Records()

	err := process(records)
	if err != nil {
		return err
	}

	return client.CommitRecords(context.Background(), records...)
}

func main() {
	client, err := kgo.NewClient(
		kgo.BlockRebalanceOnPoll(),
		kgo.SeedBrokers(`localhost:9092`),
	)
	if err != nil {
		log.Fatal(err)
	}

	for {
		if err := doOnePoll(client); err != nil {
			log.Fatal(err)
		}
	}
}
  • We block rebalances while processing
  • We poll for records, process them and then allow rebalances
  • We use franz-go v1.15.3

We experienced this with the CooperativeStickyBalancer; this application also used the range balancer for a couple of weeks without problems but I'm not sure it wasn't dumb luck.

We had this problem only once and this application has been running with the cooperative sticky balancer for ~12 days.

After some digging in the source code and based on this goroutine that's blocked:

1 @ 0x43e28e 0x4099ad 0x4095b2 0x7cf7b9 0x7cf2cc 0x7ce896 0x7cdba7 0xf8582f 0xf8350b 0xf83f05 0xf84de6 0xfa08bf 0xd51076 0x4713e1
#	0x7cf7b8	github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).commitOffsetsSync.func1+0x18	/app/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go:2572
#	0x7cf2cb	github.com/twmb/franz-go/pkg/kgo.(*groupConsumer).commitOffsetsSync+0x4eb	/app/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go:2582
#	0x7ce895	github.com/twmb/franz-go/pkg/kgo.(*Client).CommitOffsetsSync+0xb5		/app/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go:2521
#	0x7cdba6	github.com/twmb/franz-go/pkg/kgo.(*Client).CommitRecords+0x206			/app/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go:2364
#	0xf8582e	golang.b47ch.com/kafka/v8.(*batchProcessor).ProcessBatch+0x56e			/app/vendor/golang.b47ch.com/kafka/v8/consumer.go:441
#	0xf8350a	golang.b47ch.com/kafka/v8.(*Consumer).processAllBatches+0x22a			/app/vendor/golang.b47ch.com/kafka/v8/consumer.go:190
#	0xf83f04	golang.b47ch.com/kafka/v8.(*Consumer).doOnePoll+0x8a4				/app/vendor/golang.b47ch.com/kafka/v8/consumer.go:310
#	0xf84de5	golang.b47ch.com/kafka/v8.(*Consumer).Run+0x65					/app/vendor/golang.b47ch.com/kafka/v8/consumer.go:358
#	0xfa08be	golang.b47ch.com/base/v14.(*App).runConsumers.func1+0x1e			/app/vendor/golang.b47ch.com/base/v14/app.go:969
#	0xd51075	golang.org/x/sync/errgroup.(*Group).Go.func1+0x55				/app/vendor/golang.org/x/sync/errgroup/errgroup.go:75

It's blocked in this code and as far as I can tell the onDone callback is never called, which blocks commitOffsetsSync. I can't figure out why though.

Does this look like a bug in franz-go or do you think we're doing something wrong in our code ?

@twmb
Copy link
Owner

twmb commented Jan 29, 2024

Good find, that's clearly a bug.

@twmb twmb added the bug Something isn't working label Jan 29, 2024
@twmb
Copy link
Owner

twmb commented Jan 29, 2024

This isn't fixed in the latest release, either. You encounter this if you cancel committing while group is undergoing a rebalance.

@vrischmann
Copy link
Author

Ha, thanks. I was looking at the commit method for some reason but it's waitJoinSyncMu which returns an error and then done is never released.

In our production code we use a single context with a timeout for both processing and committing which was set at 2s, I think that's why we called CommitRecords with a canceled context.
In the meantime we'll use a non cancellable context for committing.

@twmb twmb closed this as completed in cd65d77 Feb 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

2 participants