From 60305f9cdc59718f017a732cabff7de772c1630a Mon Sep 17 00:00:00 2001 From: Karel Bilek Date: Tue, 30 Apr 2024 13:48:52 +0200 Subject: [PATCH] kafka: healthcheck for consumer group coordinator currently, when you try to use consumer groups right away, it's broken and you need to wait a few seconds before the coordinator is ready. This fixes it. --- preset/kafka/preset.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/preset/kafka/preset.go b/preset/kafka/preset.go index 2cfaa3ff..1aa9c808 100644 --- a/preset/kafka/preset.go +++ b/preset/kafka/preset.go @@ -138,6 +138,20 @@ func (p *P) healthcheck(ctx context.Context, c *gnomock.Container) (err error) { return fmt.Errorf("can't create topic: %w", err) } + group, err := kafka.NewConsumerGroup(kafka.ConsumerGroupConfig{ + ID: "gnomock", + Brokers: []string{c.Address(BrokerPort)}, + Topics: []string{"gnomock"}, + }) + if err != nil { + return fmt.Errorf("can't create consumer group: %w", err) + } + defer group.Close() + + if _, err := group.Next(ctx); err != nil { + return fmt.Errorf("can't read next consumer group: %w", err) + } + if p.UseSchemaRegistry { url := "http://" + c.Address(SchemaRegistryPort)