diff --git a/control-plane/pkg/kafka/clientpool/clientpool.go b/control-plane/pkg/kafka/clientpool/clientpool.go index b2d8c9c49d..f8261f2b2b 100644 --- a/control-plane/pkg/kafka/clientpool/clientpool.go +++ b/control-plane/pkg/kafka/clientpool/clientpool.go @@ -66,6 +66,16 @@ type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, sec type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) { + client, err := cp.getClient(ctx, bootstrapServers, secret) + if err != nil { + return nil, err + } + + client.incrementCallers() + return client, nil +} + +func (cp *ClientPool) getClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (*client, error) { // (bootstrapServers, secret) uniquely identifies a sarama client config with the options we allow users to configure key := makeClusterAdminKey(bootstrapServers, secret) @@ -76,7 +86,6 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, // if a corresponding connection already exists, lets use it if val, ok := cp.cache.Get(key); ok && val.hasCorrectSecretVersion(secret) { logger.Debug("successfully got a client from the clientpool") - val.incrementCallers() return val, nil } logger.Debug("failed to get an existing client, going to create one") @@ -108,13 +117,12 @@ func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, logger.Debug("Closing client") - if err := value.client.Close(); !errors.Is(err, sarama.ErrClosedClient) { + if err := value.client.Close(); err != nil && !errors.Is(err, sarama.ErrClosedClient) { logger.Errorw("Failed to close client", zap.Error(err)) } }() }) - val.incrementCallers() return val, nil } diff --git a/control-plane/pkg/kafka/clientpool/clientpool_test.go b/control-plane/pkg/kafka/clientpool/clientpool_test.go index 91f3e2d0b5..fabcd2ff4d 100644 --- a/control-plane/pkg/kafka/clientpool/clientpool_test.go +++ b/control-plane/pkg/kafka/clientpool/clientpool_test.go @@ -23,6 +23,7 @@ import ( "github.com/IBM/sarama" "github.com/stretchr/testify/assert" + "go.uber.org/atomic" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kafkatesting "knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/testing" @@ -117,6 +118,48 @@ func TestGetClusterAdmin(t *testing.T) { cancel() } +func TestClientCloses(t *testing.T) { + t.Parallel() + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*30) + + cache := prober.NewLocalExpiringCache[clientKey, *client, struct{}](ctx, time.Second*1) + clientClosed := atomic.NewBool(false) + adminClosed := atomic.NewBool(false) + + clients := &ClientPool{ + cache: cache, + newSaramaClient: func(_ []string, _ *sarama.Config) (sarama.Client, error) { + return &kafkatesting.MockKafkaClient{OnClose: func() { + clientClosed.Toggle() + }}, nil + }, + newClusterAdminFromClient: func(c sarama.Client) (sarama.ClusterAdmin, error) { + return &kafkatesting.MockKafkaClusterAdmin{ExpectedTopics: []string{"topic1"}, OnClose: func() { + c.Close() + adminClosed.Toggle() + }}, nil + }, + } + + client1, err := clients.GetClient(ctx, []string{"localhost:9092"}, nil) + assert.NoError(t, err) + + clusterAdmin, err := clients.GetClusterAdmin(ctx, []string{"localhost:9092"}, nil) + assert.NoError(t, err) + + clusterAdmin.Close() + client1.Close() + + time.Sleep(time.Second * 2) + + // the client should have been closed successfully now + assert.True(t, clientClosed.Load()) + assert.True(t, adminClosed.Load()) + + cancel() +} + func TestMakeClientKey(t *testing.T) { key1 := makeClusterAdminKey([]string{"localhost:9090", "localhost:9091", "localhost:9092"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}}) key2 := makeClusterAdminKey([]string{"localhost:9092", "localhost:9091", "localhost:9090"}, &corev1.Secret{ObjectMeta: metav1.ObjectMeta{Name: "secret", Namespace: "knative-eventing"}}) diff --git a/control-plane/pkg/kafka/clientpool/clusteradmin.go b/control-plane/pkg/kafka/clientpool/clusteradmin.go index e0d8867da8..f6147b569c 100644 --- a/control-plane/pkg/kafka/clientpool/clusteradmin.go +++ b/control-plane/pkg/kafka/clientpool/clusteradmin.go @@ -46,8 +46,6 @@ func clusterAdminFromClient(saramaClient sarama.Client, makeClusterAdmin kafka.N return nil, err } - c.incrementCallers() - return &clusterAdmin{ client: c, clusterAdmin: ca, @@ -291,5 +289,5 @@ func (a *clusterAdmin) RemoveMemberFromConsumerGroup(groupId string, groupInstan } func (a *clusterAdmin) Close() error { - return a.client.Close() + return a.clusterAdmin.Close() } diff --git a/control-plane/pkg/kafka/testing/admin_mock.go b/control-plane/pkg/kafka/testing/admin_mock.go index ec1c672877..bad95e8176 100644 --- a/control-plane/pkg/kafka/testing/admin_mock.go +++ b/control-plane/pkg/kafka/testing/admin_mock.go @@ -58,6 +58,8 @@ type MockKafkaClusterAdmin struct { ErrorOnDeleteConsumerGroup error + OnClose func() + T *testing.T } @@ -301,5 +303,8 @@ func (m *MockKafkaClusterAdmin) RemoveMemberFromConsumerGroup(groupId string, gr func (m *MockKafkaClusterAdmin) Close() error { m.ExpectedClose = true + if m.OnClose != nil { + m.OnClose() + } return m.ExpectedCloseError } diff --git a/control-plane/pkg/kafka/testing/client_mock.go b/control-plane/pkg/kafka/testing/client_mock.go index e237b59247..eadc2b9b91 100644 --- a/control-plane/pkg/kafka/testing/client_mock.go +++ b/control-plane/pkg/kafka/testing/client_mock.go @@ -28,6 +28,7 @@ type MockKafkaClient struct { ShouldFailRefreshMetadata bool ShouldFailRefreshBrokers bool ShouldFailBrokenPipe bool + OnClose func() } var _ sarama.Client = &MockKafkaClient{} @@ -195,6 +196,9 @@ func (m MockKafkaClient) LeastLoadedBroker() *sarama.Broker { func (m *MockKafkaClient) Close() error { m.IsClosed = true + if m.OnClose != nil { + m.OnClose() + } return m.CloseError }