Skip to content

Commit

Permalink
Init offsets when initialOffset is latest in KafkaSource (#1399)
Browse files Browse the repository at this point in the history
* Init offsets when initialOffset is latest in KafkaSource

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

* Rename file

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>

* Rename false reason to plural offsets

Signed-off-by: Pierangelo Di Pilato <pdipilat@redhat.com>
  • Loading branch information
pierDipi authored Nov 3, 2021
1 parent f327038 commit 1b47375
Show file tree
Hide file tree
Showing 7 changed files with 209 additions and 6 deletions.
25 changes: 20 additions & 5 deletions control-plane/pkg/reconciler/base/receiver_condition_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ import (
)

const (
ConditionAddressable apis.ConditionType = "Addressable"
ConditionDataPlaneAvailable apis.ConditionType = "DataPlaneAvailable"
ConditionTopicReady apis.ConditionType = "TopicReady"
ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated"
ConditionConfigParsed apis.ConditionType = "ConfigParsed"
ConditionAddressable apis.ConditionType = "Addressable"
ConditionDataPlaneAvailable apis.ConditionType = "DataPlaneAvailable"
ConditionTopicReady apis.ConditionType = "TopicReady"
ConditionConfigMapUpdated apis.ConditionType = "ConfigMapUpdated"
ConditionConfigParsed apis.ConditionType = "ConfigParsed"
ConditionInitialOffsetsCommitted apis.ConditionType = "InitialOffsetsCommitted"
)

var IngressConditionSet = apis.NewLivingConditionSet(
Expand All @@ -51,6 +52,7 @@ var EgressConditionSet = apis.NewLivingConditionSet(
ConditionDataPlaneAvailable,
ConditionTopicReady,
ConditionConfigMapUpdated,
ConditionInitialOffsetsCommitted,
)

const (
Expand Down Expand Up @@ -250,3 +252,16 @@ func (manager *StatusConditionManager) TopicsNotPresentOrInvalid(topics []string
)
return fmt.Errorf("topics %v not present or invalid: check topic configuration", topics)
}

func (manager *StatusConditionManager) InitialOffsetNotCommitted(err error) error {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkFalse(
ConditionInitialOffsetsCommitted,
"InitialOffsetsNotCommitted",
err.Error(),
)
return err
}

func (manager *StatusConditionManager) InitialOffsetsCommitted() {
manager.Object.GetConditionSet().Manage(manager.Object.GetStatus()).MarkTrue(ConditionInitialOffsetsCommitted)
}
31 changes: 31 additions & 0 deletions control-plane/pkg/reconciler/kafka/consumer_group.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka

import (
"context"

"github.com/Shopify/sarama"
"knative.dev/eventing-kafka/pkg/common/kafka/offset"
)

// InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.
type InitOffsetsFunc func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error)

var (
_ InitOffsetsFunc = offset.InitOffsets
)
111 changes: 111 additions & 0 deletions control-plane/pkg/reconciler/kafka/testing/client_mock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2021 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package testing

import (
"github.com/Shopify/sarama"
)

type MockKafkaClient struct {
CloseError error
IsClosed bool
}

var (
_ sarama.Client = &MockKafkaClient{}
)

func (m MockKafkaClient) Config() *sarama.Config {
return sarama.NewConfig()
}

func (m MockKafkaClient) Controller() (*sarama.Broker, error) {
panic("implement me")
}

func (m MockKafkaClient) RefreshController() (*sarama.Broker, error) {
panic("implement me")
}

func (m MockKafkaClient) Brokers() []*sarama.Broker {
panic("implement me")
}

func (m MockKafkaClient) Broker(brokerID int32) (*sarama.Broker, error) {
panic("implement me")
}

func (m MockKafkaClient) Topics() ([]string, error) {
panic("implement me")
}

func (m MockKafkaClient) Partitions(topic string) ([]int32, error) {
panic("implement me")
}

func (m MockKafkaClient) WritablePartitions(topic string) ([]int32, error) {
panic("implement me")
}

func (m MockKafkaClient) Leader(topic string, partitionID int32) (*sarama.Broker, error) {
panic("implement me")
}

func (m MockKafkaClient) Replicas(topic string, partitionID int32) ([]int32, error) {
panic("implement me")
}

func (m MockKafkaClient) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
panic("implement me")
}

func (m MockKafkaClient) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
panic("implement me")
}

func (m MockKafkaClient) RefreshBrokers(addrs []string) error {
panic("implement me")
}

func (m MockKafkaClient) RefreshMetadata(topics ...string) error {
panic("implement me")
}

func (m MockKafkaClient) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
panic("implement me")
}

func (m MockKafkaClient) Coordinator(consumerGroup string) (*sarama.Broker, error) {
panic("implement me")
}

func (m MockKafkaClient) RefreshCoordinator(consumerGroup string) error {
panic("implement me")
}

func (m MockKafkaClient) InitProducerID() (*sarama.InitProducerIDResponse, error) {
panic("implement me")
}

func (m *MockKafkaClient) Close() error {
m.IsClosed = true
return m.CloseError
}

func (m MockKafkaClient) Closed() bool {
return m.IsClosed
}
3 changes: 3 additions & 0 deletions control-plane/pkg/reconciler/source/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/Shopify/sarama"
sources "knative.dev/eventing-kafka/pkg/apis/sources/v1beta1"
"knative.dev/eventing-kafka/pkg/common/kafka/offset"
kubeclient "knative.dev/pkg/client/injection/kube/client"
podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod"
secretinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/secret"
Expand Down Expand Up @@ -51,7 +52,9 @@ func NewController(ctx context.Context, _ configmap.Watcher, configs *config.Env
DispatcherLabel: base.SourceDispatcherLabel,
},
Env: configs,
NewKafkaClient: sarama.NewClient,
NewKafkaClusterAdminClient: sarama.NewClusterAdmin,
InitOffsetsFunc: offset.InitOffsets,
}

impl := kafkasource.NewImpl(ctx, r)
Expand Down
29 changes: 28 additions & 1 deletion control-plane/pkg/reconciler/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,9 +57,16 @@ type Reconciler struct {

Resolver *resolver.URIResolver

// NewKafkaClient creates new sarama Client. It's convenient to add this as Reconciler field so that we can
// mock the function used during the reconciliation loop.
NewKafkaClient kafka.NewClientFunc
// NewKafkaClusterAdminClient creates new sarama ClusterAdmin. It's convenient to add this as Reconciler field so that we can
// mock the function used during the reconciliation loop.
NewKafkaClusterAdminClient kafka.NewClusterAdminClientFunc
// InitOffsetsFunc initialize offsets for a provided set of topics and a provided consumer group id.
// It's convenient to add this as Reconciler field so that we can mock the function used during the
// reconciliation loop.
InitOffsetsFunc kafka.InitOffsetsFunc
}

func (r *Reconciler) ReconcileKind(ctx context.Context, ks *sources.KafkaSource) reconciler.Event {
Expand Down Expand Up @@ -104,9 +111,15 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *sources.KafkaSource)
return fmt.Errorf("error getting cluster admin sarama config: %w", err)
}

kafkaClient, err := r.NewKafkaClient(ks.Spec.BootstrapServers, saramaConfig)
if err != nil {
return statusConditionManager.TopicsNotPresentOrInvalidErr(ks.Spec.Topics, fmt.Errorf("error getting sarama config: %w", err))
}
defer kafkaClient.Close()

kafkaClusterAdminClient, err := r.NewKafkaClusterAdminClient(ks.Spec.BootstrapServers, saramaConfig)
if err != nil {
return fmt.Errorf("cannot obtain Kafka cluster admin, %w", err)
return statusConditionManager.TopicsNotPresentOrInvalidErr(ks.Spec.Topics, fmt.Errorf("cannot obtain Kafka cluster admin, %w", err))
}
defer kafkaClusterAdminClient.Close()

Expand All @@ -119,6 +132,20 @@ func (r *Reconciler) reconcileKind(ctx context.Context, ks *sources.KafkaSource)
}
statusConditionManager.TopicReady(strings.Join(ks.Spec.Topics, ", "))

if ks.Spec.InitialOffset == sources.OffsetLatest {
logger.Debug("Initializing initial offset",
zap.String("initialOffset", string(ks.Spec.InitialOffset)),
zap.String("consumerGroup", ks.Spec.ConsumerGroup),
zap.Strings("topics", ks.Spec.Topics),
)
if _, err := r.InitOffsetsFunc(ctx, kafkaClient, kafkaClusterAdminClient, ks.Spec.Topics, ks.Spec.ConsumerGroup); err != nil {
return statusConditionManager.InitialOffsetNotCommitted(
fmt.Errorf("unable to initialize consumer group %s offsets: %w", ks.Spec.ConsumerGroup, err),
)
}
}
statusConditionManager.InitialOffsetsCommitted()

// Get contract config map.
contractConfigMap, err := r.GetOrCreateDataPlaneConfigMap(ctx)
if err != nil {
Expand Down
12 changes: 12 additions & 0 deletions control-plane/pkg/reconciler/source/source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -182,6 +183,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -242,6 +244,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -302,6 +305,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -362,6 +366,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -422,6 +427,7 @@ func TestReconcileKind(t *testing.T) {
SourceConfigMapUpdatedReady(&configs),
SourceTopicsReady,
SourceDataPlaneAvailable,
InitialOffsetsCommitted,
),
},
},
Expand Down Expand Up @@ -457,6 +463,12 @@ func useTable(t *testing.T, table TableTest, configs broker.Configs) {
DispatcherLabel: base.SourceDispatcherLabel,
},
Env: &configs.Env,
InitOffsetsFunc: func(ctx context.Context, kafkaClient sarama.Client, kafkaAdminClient sarama.ClusterAdmin, topics []string, consumerGroup string) (int32, error) {
return 1, nil
},
NewKafkaClient: func(addrs []string, config *sarama.Config) (sarama.Client, error) {
return &kafkatesting.MockKafkaClient{}, nil
},
NewKafkaClusterAdminClient: func(_ []string, _ *sarama.Config) (sarama.ClusterAdmin, error) {
return &kafkatesting.MockKafkaClusterAdmin{
ExpectedTopicName: "",
Expand Down
4 changes: 4 additions & 0 deletions control-plane/pkg/reconciler/testing/objects_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ func SourceTopicsReady(source *sources.KafkaSource) {
)
}

func InitialOffsetsCommitted(source *sources.KafkaSource) {
source.GetConditionSet().Manage(source.GetStatus()).MarkTrue(base.ConditionInitialOffsetsCommitted)
}

func SourceDataPlaneAvailable(source *sources.KafkaSource) {
source.GetConditionSet().Manage(source.GetStatus()).MarkTrue(base.ConditionDataPlaneAvailable)
}
Expand Down

0 comments on commit 1b47375

Please sign in to comment.