Skip to content

Commit

Permalink
reenable stopping st adapter when a change occurs (openshift-knative#406
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lionelvillard authored Feb 19, 2021
1 parent 42f3935 commit ae8020f
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 14 deletions.
14 changes: 8 additions & 6 deletions pkg/common/scheduler/statefulset/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,14 @@ func (s *StatefulSetScheduler) autoscale(ctx context.Context) {
// Desired ratio is 0.5 (TODO: configurable)
new := int32(math.Ceil(float64(s.usedCapacity(snapshot)+s.pendingVReplicas()) / (float64(s.capacity) * 0.5)))

// Make sure not to scale down past the last pod with placed vpods
if new < snapshot.lastOrdinal+1 {
new = snapshot.lastOrdinal + 1
}

if new != scale.Spec.Replicas {
//// Make sure not to scale down past the last pod with placed vpods
//if new < snapshot.lastOrdinal+1 {
// new = snapshot.lastOrdinal + 1
//}
// Disable scaling down until we have a better story with finalizers
// See https://github.com/knative-sandbox/eventing-kafka/issues/412

if new > scale.Spec.Replicas {
scale.Spec.Replicas = new
s.logger.Infow("updating adapter replicas", zap.Int32("replicas", scale.Spec.Replicas))

Expand Down
16 changes: 9 additions & 7 deletions pkg/source/mtadapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,23 +92,22 @@ func (a *Adapter) Start(ctx context.Context) error {
func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) {
a.sourcesMu.Lock()
defer a.sourcesMu.Unlock()
a.logger.Infow("adding source", "name", obj.Name)

key := obj.Namespace + "/" + obj.Name

_, ok := a.sources[key]
cancel, ok := a.sources[key]

if ok {
// TODO: see https://github.com/knative-sandbox/eventing-kafka/issues/382
return
// TODO: do not stop if the only thing that changes is the number of vreplicas
//a.logger.Info("stopping adapter", zap.String("key", key))
//cancel()
a.logger.Info("stopping adapter", zap.String("key", key))
cancel()
}

placement := scheduler.GetPlacementForPod(obj.GetPlacements(), a.config.PodName)
if placement == nil || placement.VReplicas == 0 {
// this pod does not handle this source. Skipping
a.logger.Infow("no replicas assigned to this pod. skipping", zap.String("key", key))
a.logger.Infow("no replicas assigned to this source. skipping", zap.String("key", key))
return
}

Expand Down Expand Up @@ -166,27 +165,30 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) {
if err != nil {
a.logger.Errorw("adapter failed to start", zap.Error(err))
}

}(ctx)

a.sources[key] = cancelFn
a.logger.Infow("source added", "name", obj.Name)
}

func (a *Adapter) Remove(ctx context.Context, obj *v1beta1.KafkaSource) {
a.sourcesMu.Lock()
defer a.sourcesMu.Unlock()
a.logger.Infow("removing source", "name", obj.Name)

key := obj.Namespace + "/" + obj.Name

cancel, ok := a.sources[key]

if !ok {
a.logger.Infow("source not found", "name", obj.Name)
return
}

cancel()

delete(a.sources, key)
a.logger.Infow("source removed", "name", obj.Name, "count", len(a.sources))
}

// ResolveSecret resolves the secret reference
Expand Down
5 changes: 4 additions & 1 deletion pkg/source/mtadapter/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package mtadapter

import (
"context"
"strings"

"knative.dev/pkg/controller"
"knative.dev/pkg/logging"
Expand All @@ -41,7 +42,7 @@ type MTAdapter interface {
// NewController initializes the controller and
// registers event handlers to enqueue events.
func NewController(ctx context.Context, adapter adapter.Adapter) *controller.Impl {
mtadapter, ok := adapter.(MTAdapter)
mtadapter, ok := adapter.(*Adapter)
if !ok {
logging.FromContext(ctx).Fatal("Multi-tenant adapters must implement the MTAdapter interface")
}
Expand All @@ -50,9 +51,11 @@ func NewController(ctx context.Context, adapter adapter.Adapter) *controller.Imp
mtadapter: mtadapter,
}

podName := mtadapter.config.PodName
impl := kafkasourcereconciler.NewImpl(ctx, r, func(impl *controller.Impl) controller.Options {
return controller.Options{
SkipStatusUpdates: true,
FinalizerName: "kafkasources.sources.knative.dev." + podName[strings.LastIndex(podName, "-")+1:],
}
})

Expand Down
13 changes: 13 additions & 0 deletions test/e2e-common.sh
Original file line number Diff line number Diff line change
Expand Up @@ -455,8 +455,21 @@ function test_mt_source() {
echo "Testing the multi-tenant source"
install_mt_source || return 1

export TEST_MT_SOURCE
go_test_e2e -tags=source,mtsource -timeout=20m -test.parallel=${TEST_PARALLEL} ./test/e2e/... || fail_test

# wait for all KafkaSources to be deleted
local iterations=0
local progress="Waiting for KafkaSources to be deleted..."
while [[ "$(kubectl get kafkasources --all-namespaces)" != "" && $iterations -lt 60 ]]
do
echo -ne "${progress}\r"
progress="${progress}."
iterations=$((iterations + 1))
kubectl get kafkasources --all-namespaces -oyaml
sleep 3
done

uninstall_mt_source || return 1
}

Expand Down
21 changes: 21 additions & 0 deletions test/e2e/kafka_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
. "github.com/cloudevents/sdk-go/v2/test"
Expand Down Expand Up @@ -63,6 +65,10 @@ type authSetup struct {
TLSEnabled bool
}

var (
test_mt_source = os.Getenv("TEST_MT_SOURCE")
)

func withAuthEnablementV1Beta1(auth authSetup) contribresources.KafkaSourceV1Beta1Option {
// We test with sasl512 and enable tls with it, so check tls first
if auth.TLSEnabled == true {
Expand Down Expand Up @@ -244,6 +250,11 @@ func testKafkaSource(t *testing.T, name string, version string, messageKey strin

client.WaitForAllTestResourcesReadyOrFail(context.Background())

// See https://github.com/knative-sandbox/eventing-kafka/issues/411
if test_mt_source == "1" {
time.Sleep(20 * time.Second)
}

helpers.MustPublishKafkaMessage(client, kafkaBootstrapUrlPlain, kafkaTopicName, messageKey, messageHeaders, messagePayload)

eventTracker.AssertExact(1, recordevents.MatchEvent(matcherGen(cloudEventsSourceName, cloudEventsEventType)))
Expand Down Expand Up @@ -534,6 +545,11 @@ func testKafkaSourceUpdate(t *testing.T, name string, test updateTest) {
))
client.WaitForAllTestResourcesReadyOrFail(context.Background())

// See https://github.com/knative-sandbox/eventing-kafka/issues/411
if test_mt_source == "1" {
time.Sleep(20 * time.Second)
}

t.Logf("Send update event to kafkatopic")
helpers.MustPublishKafkaMessage(client, kafkaBootstrapUrlPlain,
defaultKafkaSource.topicName+name,
Expand Down Expand Up @@ -570,6 +586,11 @@ func testKafkaSourceUpdate(t *testing.T, name string, test updateTest) {
contribtestlib.UpdateKafkaSourceV1Beta1OrFail(client, ksObj)
client.WaitForAllTestResourcesReadyOrFail(context.Background())

// See https://github.com/knative-sandbox/eventing-kafka/issues/411
if test_mt_source == "1" {
time.Sleep(20 * time.Second)
}

t.Logf("Send update event to kafkatopic")
helpers.MustPublishKafkaMessage(client, kafkaBootstrapUrlPlain,
test.topicName+name,
Expand Down

0 comments on commit ae8020f

Please sign in to comment.