diff --git a/consumer_group.go b/consumer_group.go index a0a1e1a41..01cdb669c 100644 --- a/consumer_group.go +++ b/consumer_group.go @@ -114,6 +114,9 @@ func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerG // necessary to call Close() on the underlying client when shutting down this consumer. // PLEASE NOTE: consumer groups can only re-use but not share clients. func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) { + if client == nil { + return nil, ConfigurationError("client must not be nil") + } // For clients passed in by the client, ensure we don't // call Close() on it. cli := &nopCloserClient{client} diff --git a/consumer_group_test.go b/consumer_group_test.go index 912b6aa4f..5bfdcc8f3 100644 --- a/consumer_group_test.go +++ b/consumer_group_test.go @@ -27,6 +27,14 @@ func (h *handler) ConsumeClaim(sess ConsumerGroupSession, claim ConsumerGroupCla return nil } +func TestNewConsumerGroupFromClient(t *testing.T) { + t.Run("should not permit nil client", func(t *testing.T) { + group, err := NewConsumerGroupFromClient("group", nil) + assert.Nil(t, group) + assert.Error(t, err) + }) +} + // TestConsumerGroupNewSessionDuringOffsetLoad ensures that the consumer group // will retry Join and Sync group operations, if it receives a temporary // OffsetsLoadInProgress error response, in the same way as it would for a