From 75ec2f81ec10d351796f860ce0c92d9a29e7b09b Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 21:08:14 -0400 Subject: [PATCH 1/8] prepare pingsource controller for codegen --- pkg/adapter/mtping/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 3f911cb0dcd..33065b4e8ae 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -42,7 +42,7 @@ const ( // mtpingAdapter implements the PingSource mt adapter to sinks type mtpingAdapter struct { logger *zap.SugaredLogger - runner CronJobRunner + runner *cronJobsRunner entryidMu sync.RWMutex entryids map[string]cron.EntryID // key: resource namespace/name } From c7ae5cf8589cfb2ff7018a35ab3fd10af3e2d3fd Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 14 Sep 2020 12:16:45 -0400 Subject: [PATCH 2/8] add more tests --- pkg/adapter/mtping/adapter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 33065b4e8ae..3f911cb0dcd 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -42,7 +42,7 @@ const ( // mtpingAdapter implements the PingSource mt adapter to sinks type mtpingAdapter struct { logger *zap.SugaredLogger - runner *cronJobsRunner + runner CronJobRunner entryidMu sync.RWMutex entryids map[string]cron.EntryID // key: resource namespace/name } From e43d705d8c7acd281524dbf5c808de8b06f13c4b Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 21:08:14 -0400 Subject: [PATCH 3/8] prepare pingsource controller for codegen --- pkg/adapter/mtping/adapter.go | 1 + pkg/adapter/mtping/runner.go | 25 +++++++++++++++++++ .../configmappropagation/controller.go | 3 +++ .../eventing/v1/broker/controller.go | 3 +++ .../eventing/v1/trigger/controller.go | 3 +++ .../eventing/v1beta1/broker/controller.go | 3 +++ .../eventing/v1beta1/eventtype/controller.go | 3 +++ .../eventing/v1beta1/trigger/controller.go | 3 +++ .../flows/v1/parallel/controller.go | 3 +++ .../flows/v1/sequence/controller.go | 3 +++ .../flows/v1beta1/parallel/controller.go | 3 +++ .../flows/v1beta1/sequence/controller.go | 3 +++ .../messaging/v1/channel/controller.go | 3 +++ .../v1/inmemorychannel/controller.go | 3 +++ .../messaging/v1/subscription/controller.go | 3 +++ .../messaging/v1beta1/channel/controller.go | 3 +++ .../v1beta1/inmemorychannel/controller.go | 3 +++ .../v1beta1/subscription/controller.go | 3 +++ .../v1alpha2/apiserversource/controller.go | 3 +++ .../v1alpha2/containersource/controller.go | 3 +++ .../sources/v1alpha2/pingsource/controller.go | 3 +++ .../v1beta1/apiserversource/controller.go | 3 +++ .../v1beta1/containersource/controller.go | 3 +++ .../sources/v1beta1/pingsource/controller.go | 3 +++ 24 files changed, 92 insertions(+) diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 3f911cb0dcd..6baf6ec9deb 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -27,6 +27,7 @@ import ( cloudevents "github.com/cloudevents/sdk-go/v2" "github.com/robfig/cron/v3" "go.uber.org/zap" + kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/controller" "knative.dev/pkg/logging" diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 1e3018c5ae6..67507a7b46b 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -22,6 +22,8 @@ import ( "math/rand" "time" + corev1 "k8s.io/api/core/v1" + cloudevents "github.com/cloudevents/sdk-go/v2" cecontext "github.com/cloudevents/sdk-go/v2/context" "github.com/google/uuid" @@ -57,6 +59,29 @@ type cronJobsRunner struct { kubeClient kubernetes.Interface } +type PingConfig struct { + corev1.ObjectReference `json:",inline"` + + // Schedule is the cronjob schedule. Defaults to `* * * * *`. + Schedule string `json:"schedule"` + + // JsonData is json encoded data used as the body of the event posted to + // the sink. Default is empty. If set, datacontenttype will also be set + // to "application/json". + // +optional + JsonData string `json:"jsonData,omitempty"` + + // Extensions specify what attribute are added or overridden on the + // outbound event. Each `Extensions` key-value pair are set on the event as + // an attribute extension independently. + // +optional + Extensions map[string]string `json:"extensions,omitempty"` + + // SinkURI is the current active sink URI that has been configured for the + // Source. + SinkURI string `json:"sinkUri,omitempty"` +} + const ( resourceGroup = "pingsources.sources.knative.dev" ) diff --git a/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go b/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go index a34f44214d1..bf27ae2f521 100644 --- a/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go +++ b/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1/broker/controller.go b/pkg/client/injection/reconciler/eventing/v1/broker/controller.go index 41e1231c62f..14980deb152 100644 --- a/pkg/client/injection/reconciler/eventing/v1/broker/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1/broker/controller.go @@ -109,6 +109,9 @@ func NewImpl(ctx context.Context, r Interface, classValue string, optionsFns ... if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go b/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go index 504a628cf1a..b9ef6380f23 100644 --- a/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go index 9e757e2cd8e..6da43b9d823 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go @@ -109,6 +109,9 @@ func NewImpl(ctx context.Context, r Interface, classValue string, optionsFns ... if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go index d2bd35e93e4..7fc14fe1adb 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go index 2a33794242b..394cd346308 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go index f533f092749..b321f203abe 100644 --- a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go index f64d0221c6c..e873986539c 100644 --- a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go b/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go index c16ff28300b..1b84ddf40a4 100644 --- a/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go +++ b/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go b/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go index 808c8aba550..656b5a2a39b 100644 --- a/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go +++ b/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/channel/controller.go b/pkg/client/injection/reconciler/messaging/v1/channel/controller.go index 940dc9bd903..2a614bf6a56 100644 --- a/pkg/client/injection/reconciler/messaging/v1/channel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/channel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go b/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go index 346de276b40..ccc76254c58 100644 --- a/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go b/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go index 235ce2f1b66..665b6a54c3c 100644 --- a/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go index 44f432c71d8..7130bec1d7c 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go index 3fb2e152800..c79b7c6324c 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go index cc2a9dc938b..504e9d53c41 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go index 5f29860b633..c5ab56a175e 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go index 6534a7d987b..0da1c4c1b3c 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go index a074c02418b..df5e232217f 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go index 5bf729d06ab..ffd6ee0b503 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go index 8d5ee04ba18..79141b40e9e 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go index bb2eb85686e..7b50f50349a 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go @@ -105,6 +105,9 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } rec.Recorder = createRecorder(ctx, agentName) From 452945753fce166fbe1da4b2e5a7c125adbe3321 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 21:08:14 -0400 Subject: [PATCH 4/8] prepare pingsource controller for codegen --- .../reconciler/sources/v1beta1/pingsource/reconciler.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go index e9799929814..3de4f28273c 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go @@ -169,6 +169,9 @@ func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versio if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } + if opts.DemoteFunc != nil { + rec.DemoteFunc = opts.DemoteFunc + } } return rec From a722d664584e7463376e654736e425a5b276d674 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Thu, 3 Sep 2020 21:08:14 -0400 Subject: [PATCH 5/8] prepare pingsource controller for codegen --- pkg/adapter/mtping/controller.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/adapter/mtping/controller.go b/pkg/adapter/mtping/controller.go index b09f51cc616..155c0e8a82f 100644 --- a/pkg/adapter/mtping/controller.go +++ b/pkg/adapter/mtping/controller.go @@ -65,6 +65,7 @@ func NewController(ctx context.Context, adapter adapter.Adapter) *controller.Imp // }, // } //} + impl := pingsourcereconciler.NewImpl(ctx, r) logging.FromContext(ctx).Info("Setting up event handlers") From 2d2c1836e2756a07eabbdd6fb208b6a74a82bed7 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Fri, 4 Sep 2020 14:41:20 -0400 Subject: [PATCH 6/8] ha buckets --- cmd/mtping/main.go | 2 +- .../deployments/pingsource-mt-adapter.yaml | 2 +- docs/source/receive-adapter.md | 4 +- go.sum | 22 +++ pkg/adapter/mtping/adapter.go | 8 +- pkg/adapter/v2/config.go | 8 +- pkg/adapter/v2/config_watcher.go | 90 ---------- pkg/adapter/v2/config_watcher_test.go | 42 ----- pkg/adapter/v2/context.go | 65 +++++++ .../v2/context_test.go} | 19 ++- pkg/adapter/v2/main.go | 72 +++----- pkg/adapter/v2/main_message.go | 21 +-- pkg/leaderelection/context.go | 158 ------------------ pkg/leaderelection/context_test.go | 115 ------------- pkg/leaderelection/leader_election_test.go | 123 -------------- .../testdata/config-leader-election.yaml | 1 - vendor/knative.dev/pkg/reconciler/leader.go | 4 + 17 files changed, 147 insertions(+), 609 deletions(-) delete mode 100644 pkg/adapter/v2/config_watcher.go delete mode 100644 pkg/adapter/v2/config_watcher_test.go create mode 100644 pkg/adapter/v2/context.go rename pkg/{leaderelection/leader_election.go => adapter/v2/context_test.go} (60%) delete mode 100644 pkg/leaderelection/context.go delete mode 100644 pkg/leaderelection/context_test.go delete mode 100644 pkg/leaderelection/leader_election_test.go delete mode 120000 pkg/leaderelection/testdata/config-leader-election.yaml diff --git a/cmd/mtping/main.go b/cmd/mtping/main.go index 9a1af2e4f5b..9c67295a4c1 100644 --- a/cmd/mtping/main.go +++ b/cmd/mtping/main.go @@ -37,7 +37,7 @@ func main() { // which under the cover delays the release of the lease. ctx := mtping.NewDelayingContext(sctx, mtping.GetNoShutDownAfterValue()) - ctx = adapter.WithInjectorEnabled(ctx) + ctx = adapter.WithController(ctx, mtping.NewController) ctx = adapter.WithHAEnabled(ctx) adapter.MainWithContext(ctx, component, mtping.NewEnvConfig, mtping.NewAdapter) } diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index bbaf579e8ca..93391f51fa5 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -21,7 +21,7 @@ metadata: eventing.knative.dev/release: devel spec: # when set to 0 (and only 0) will be set to 1 when the first PingSource is created. - replicas: 0 + replicas: 3 selector: matchLabels: eventing.knative.dev/source: ping-source-controller diff --git a/docs/source/receive-adapter.md b/docs/source/receive-adapter.md index bb025034c11..041482d456e 100644 --- a/docs/source/receive-adapter.md +++ b/docs/source/receive-adapter.md @@ -83,12 +83,12 @@ handle more than one source instance at a time, either all source instances in one namespace or all source instances in the cluster. In order to support multi-resources per receive adapter, you need to enable the -ConfigMap watcher feature, as follows: +Controller watcher feature, as follows: ```go func main() { ctx := signals.NewContext() - ctx = adapter.WithInjectorEnabled(ctx) + ctx = adapter.WithController(ctx, youradapter.NewController) ctx = adapter.WithConfigMapWatcherEnabled(ctx) adapter.MainWithContext(ctx, "yourcomponent", youradapter.NewEnvConfig, diff --git a/go.sum b/go.sum index 2386a3c8d48..c88d07c7c7b 100644 --- a/go.sum +++ b/go.sum @@ -191,8 +191,12 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg= +<<<<<<< HEAD github.com/aws/aws-k8s-tester v0.0.0-20190114231546-b411acf57dfe/go.mod h1:1ADF5tAtU1/mVtfMcHAYSm2fPw71DA7fFk0yed64/0I= github.com/aws/aws-k8s-tester v0.9.3/go.mod h1:nsh1f7joi8ZI1lvR+Ron6kJM2QdCYPU/vFePghSSuTc= +======= +github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M= +>>>>>>> ha buckets github.com/aws/aws-k8s-tester v1.0.0/go.mod h1:NUNd9k43+h9O5tvwL+4N1Ctb//SapmeeFX1G0/2/0Qc= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= @@ -291,6 +295,13 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cucumber/gherkin-go/v11 v11.0.0 h1:cwVwN1Qn2VRSfHZNLEh5x00tPBmZcjATBWDpxsR5Xug= +github.com/cucumber/gherkin-go/v11 v11.0.0/go.mod h1:CX33k2XU2qog4e+TFjOValoq6mIUq0DmVccZs238R9w= +github.com/cucumber/godog v0.10.0 h1:W01u1+o8bRpgqJRLrclN3iAanU1jAao+TwOMoSV9g1Y= +github.com/cucumber/godog v0.10.0/go.mod h1:0Q+MOUg8Z9AhzLV+nNMbThQ2x1b17yYwGyahApTLjJA= +github.com/cucumber/messages-go/v10 v10.0.1/go.mod h1:kA5T38CBlBbYLU12TIrJ4fk4wSkVVOgyh7Enyy8WnSg= +github.com/cucumber/messages-go/v10 v10.0.3 h1:m/9SD/K/A15WP7i1aemIv7cwvUw+viS51Ui5HBw1cdE= +github.com/cucumber/messages-go/v10 v10.0.3/go.mod h1:9jMZ2Y8ZxjLY6TG2+x344nt5rXstVVDYSdS5ySfI1WY= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -471,6 +482,8 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= +github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= +github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -662,6 +675,10 @@ github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8= +github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= +github.com/hashicorp/go-memdb v1.2.1 h1:wI9btDjYUOJJHTCnRlAG/TkRyD/ij7meJMrLK9X31Cc= +github.com/hashicorp/go-memdb v1.2.1/go.mod h1:OSvLJ662Jim8hMM+gWGyhktyWk2xPCnWMc7DWIqtkGA= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= @@ -675,6 +692,7 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/go-version v1.0.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -1082,6 +1100,8 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= @@ -1696,6 +1716,8 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20190709130402-674ba3eaed22/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d h1:LCPbGQ34PMrwad11aMZ+dbz5SAsq/0ySjRwQ8I9Qwd8= gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= helm.sh/helm/v3 v3.1.1/go.mod h1:WYsFJuMASa/4XUqLyv54s0U/f3mlAaRErGmyy4z921g= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/adapter/mtping/adapter.go b/pkg/adapter/mtping/adapter.go index 6baf6ec9deb..268af638efc 100644 --- a/pkg/adapter/mtping/adapter.go +++ b/pkg/adapter/mtping/adapter.go @@ -29,7 +29,6 @@ import ( "go.uber.org/zap" kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/eventing/pkg/adapter/v2" @@ -71,14 +70,9 @@ func NewAdapter(ctx context.Context, _ adapter.EnvConfigAccessor, ceClient cloud // Start implements adapter.Adapter func (a *mtpingAdapter) Start(ctx context.Context) error { - ctrl := NewController(ctx, a) - - a.logger.Info("Starting controllers...") - go controller.StartAll(ctx, ctrl) - - defer a.runner.Stop() a.logger.Info("Starting job runner...") a.runner.Start(ctx.Done()) + defer a.runner.Stop() a.logger.Infof("runner stopped") return nil diff --git a/pkg/adapter/v2/config.go b/pkg/adapter/v2/config.go index 817f5fc0236..c96535a46bb 100644 --- a/pkg/adapter/v2/config.go +++ b/pkg/adapter/v2/config.go @@ -179,18 +179,20 @@ func (e *EnvConfig) GetCloudEventOverrides() (*duckv1.CloudEventOverrides, error func (e *EnvConfig) GetLeaderElectionConfig() (*kle.ComponentConfig, error) { if e.LeaderElectionConfigJson == "" { - return defaultLeaderElectionConfig(), nil + return e.defaultLeaderElectionConfig(), nil } var config kle.ComponentConfig if err := json.Unmarshal([]byte(e.LeaderElectionConfigJson), &config); err != nil { - return defaultLeaderElectionConfig(), err + return e.defaultLeaderElectionConfig(), err } + config.Component = e.Component return &config, nil } -func defaultLeaderElectionConfig() *kle.ComponentConfig { +func (e *EnvConfig) defaultLeaderElectionConfig() *kle.ComponentConfig { return &kle.ComponentConfig{ + Component: e.Component, Buckets: 1, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, diff --git a/pkg/adapter/v2/config_watcher.go b/pkg/adapter/v2/config_watcher.go deleted file mode 100644 index 9676116c24c..00000000000 --- a/pkg/adapter/v2/config_watcher.go +++ /dev/null @@ -1,90 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package adapter - -import ( - "context" - "fmt" - "log" - - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/selection" - - kubeclient "knative.dev/pkg/client/injection/kube/client" - "knative.dev/pkg/configmap" - - "knative.dev/eventing/pkg/utils/cache" -) - -type enableConfigMapWatcherKey struct{} - -// WithConfigMapWatcherEnabled signals to MainWithContext that it should -// create and configure a ConfigMap watcher -func WithConfigMapWatcherEnabled(ctx context.Context) context.Context { - return context.WithValue(ctx, enableConfigMapWatcherKey{}, struct{}{}) -} - -// IsConfigMapWatcherEnabled tells MainWithContext that the ConfigMapWatcher -// is enabled -func IsConfigMapWatcherEnabled(ctx context.Context) bool { - val := ctx.Value(enableConfigMapWatcherKey{}) - return val != nil -} - -type configMapWatcherKey struct{} - -// WithConfigWatcher signals to MainWithContext that it should -// create and configure a ConfigMap watcher -func WithConfigMapWatcher(ctx context.Context, watcher configmap.Watcher) context.Context { - return context.WithValue(ctx, configMapWatcherKey{}, watcher) -} - -// ConfigMapWatcherFromContext gets the ConfigMapWatchers from the context -// or nil if there is none suitable -func ConfigMapWatcherFromContext(ctx context.Context) configmap.Watcher { - val := ctx.Value(configMapWatcherKey{}) - if val == nil { - return nil - } - if watcher, ok := val.(configmap.Watcher); ok { - return watcher - } - return nil -} - -// SetupConfigMapWatchOrDie establishes a watch of the configmaps in the component -// namespace that are labeled to be watched or dies by calling log.Fatalf. -func SetupConfigMapWatchOrDie(ctx context.Context, component string, namespace string) configmap.Watcher { - kc := kubeclient.Get(ctx) - // Create ConfigMaps watcher with label-based filter. - cmLabelReqs, err := FilterConfigByLabelEquals(cache.ComponentLabelKey, component) - if err != nil { - log.Fatalf("Failed to generate requirement for label %s: %v", cache.ComponentLabelKey, err) - } - - return configmap.NewInformedWatcher(kc, namespace, *cmLabelReqs) -} - -// FilterConfigByLabelEquals returns an "equas" label requirement for the -// given label key and value -func FilterConfigByLabelEquals(labelKey string, labelValue string) (*labels.Requirement, error) { - req, err := labels.NewRequirement(labelKey, selection.Equals, []string{labelValue}) - if err != nil { - return nil, fmt.Errorf("could not construct label requirement: %w", err) - } - return req, nil -} diff --git a/pkg/adapter/v2/config_watcher_test.go b/pkg/adapter/v2/config_watcher_test.go deleted file mode 100644 index d4f4098aed2..00000000000 --- a/pkg/adapter/v2/config_watcher_test.go +++ /dev/null @@ -1,42 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package adapter - -import ( - "context" - "testing" - - _ "knative.dev/pkg/client/injection/kube/client/fake" - rectesting "knative.dev/pkg/reconciler/testing" -) - -func TestConfigMapWatcherEnabled(t *testing.T) { - ctx := WithConfigMapWatcherEnabled(context.TODO()) - if !IsConfigMapWatcherEnabled(ctx) { - t.Error("expected config watcher to be enabled") - } -} - -func TestConfigMapWatcher(t *testing.T) { - ctx, _ := rectesting.SetupFakeContext(t) - watcher := SetupConfigMapWatchOrDie(ctx, "component", "test-ns") - ctx = WithConfigMapWatcher(ctx, watcher) - - if ConfigMapWatcherFromContext(ctx) == nil { - t.Error("expected config watcher, got nothing") - } -} diff --git a/pkg/adapter/v2/context.go b/pkg/adapter/v2/context.go new file mode 100644 index 00000000000..afcf2ef886f --- /dev/null +++ b/pkg/adapter/v2/context.go @@ -0,0 +1,65 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package adapter + +import ( + "context" +) + +type haEnabledKey struct{} + +// WithHAEnabled signals to MainWithContext that it should set up an appropriate leader elector for this component. +func WithHAEnabled(ctx context.Context) context.Context { + return context.WithValue(ctx, haEnabledKey{}, struct{}{}) +} + +// IsHAEnabled checks the context for the desire to enable leader elector. +func IsHAEnabled(ctx context.Context) bool { + val := ctx.Value(haEnabledKey{}) + return val != nil +} + +type haDisabledFlagKey struct{} + +// withHADisabledFlag signals to MainWithConfig that it should not set up an appropriate leader elector for this component. +func withHADisabledFlag(ctx context.Context) context.Context { + return context.WithValue(ctx, haDisabledFlagKey{}, struct{}{}) +} + +// isHADisabledFlag checks the context for the desired to disable leader elector. +func isHADisabledFlag(ctx context.Context) bool { + val := ctx.Value(haDisabledFlagKey{}) + return val != nil +} + +type controllerKey struct{} + +// WithController signals to MainWithContext that it should +// create and configure a controller notifying the adapter +// when a resource is ready and removed +func WithController(ctx context.Context, ctor ControllerConstructor) context.Context { + return context.WithValue(ctx, controllerKey{}, ctor) +} + +// ControllerFromContext gets the controller constructor from the context +func ControllerFromContext(ctx context.Context) ControllerConstructor { + value := ctx.Value(controllerKey{}) + if value == nil { + return nil + } + return value.(ControllerConstructor) +} diff --git a/pkg/leaderelection/leader_election.go b/pkg/adapter/v2/context_test.go similarity index 60% rename from pkg/leaderelection/leader_election.go rename to pkg/adapter/v2/context_test.go index 6e9a729bbd8..75e393b24e3 100644 --- a/pkg/leaderelection/leader_election.go +++ b/pkg/adapter/v2/context_test.go @@ -14,10 +14,23 @@ See the License for the specific language governing permissions and limitations under the License. */ -package leaderelection +package adapter import ( - kle "knative.dev/pkg/leaderelection" + "context" + "testing" + + "knative.dev/pkg/controller" + + _ "knative.dev/pkg/client/injection/kube/client/fake" ) -var ValidateConfig = kle.NewConfigFromConfigMap +func TestWithController(t *testing.T) { + ctx := WithController(context.TODO(), func(ctx context.Context, adapter Adapter) *controller.Impl { + return nil + }) + + if ControllerFromContext(ctx) == nil { + t.Error("expected non-nil controller constructor") + } +} diff --git a/pkg/adapter/v2/main.go b/pkg/adapter/v2/main.go index 2811280e4f7..5ab731f0824 100644 --- a/pkg/adapter/v2/main.go +++ b/pkg/adapter/v2/main.go @@ -20,6 +20,7 @@ import ( "context" "flag" "fmt" + "log" "net/http" "strconv" @@ -34,14 +35,15 @@ import ( "knative.dev/pkg/controller" "knative.dev/pkg/injection" "knative.dev/pkg/injection/sharedmain" + "knative.dev/pkg/leaderelection" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" + "knative.dev/pkg/reconciler" "knative.dev/pkg/signals" "knative.dev/pkg/source" "knative.dev/eventing/pkg/adapter/v2/util/crstatusevent" - "knative.dev/eventing/pkg/leaderelection" ) // Adapter is the interface receive adapters are expected to implement @@ -51,40 +53,20 @@ type Adapter interface { type AdapterConstructor func(ctx context.Context, env EnvConfigAccessor, client cloudevents.Client) Adapter -type haEnabledKey struct{} - -// WithHAEnabled signals to MainWithContext that it should set up an appropriate leader elector for this component. -func WithHAEnabled(ctx context.Context) context.Context { - return context.WithValue(ctx, haEnabledKey{}, struct{}{}) -} - -// IsHAEnabled checks the context for the desire to enable leader elector. -func IsHAEnabled(ctx context.Context) bool { - val := ctx.Value(haEnabledKey{}) - return val != nil -} - -type haDisabledFlagKey struct{} - -// withHADisabledFlag signals to MainWithConfig that it should not set up an appropriate leader elector for this component. -func withHADisabledFlag(ctx context.Context) context.Context { - return context.WithValue(ctx, haDisabledFlagKey{}, struct{}{}) -} - -// isHADisabledFlag checks the context for the desired to disable leader elector. -func isHADisabledFlag(ctx context.Context) bool { - val := ctx.Value(haDisabledFlagKey{}) - return val != nil -} +// ControllerConstructor is the function signature for creating controllers synchronizing +// the multi-tenant receive adapter state +type ControllerConstructor func(ctx context.Context, adapter Adapter) *controller.Impl type injectorEnabledKey struct{} // WithInjectorEnabled signals to MainWithInjectors that it should try to run injectors. +// TODO: deprecated. Use WithController instead func WithInjectorEnabled(ctx context.Context) context.Context { return context.WithValue(ctx, injectorEnabledKey{}, struct{}{}) } // IsInjectorEnabled checks the context for the desire to enable injectors +// TODO: deprecated. func IsInjectorEnabled(ctx context.Context) bool { val := ctx.Value(injectorEnabledKey{}) return val != nil @@ -104,7 +86,7 @@ func MainWithEnv(ctx context.Context, component string, env EnvConfigAccessor, c flag.Bool("disable-ha", false, "Whether to disable high-availability functionality for this component.") } - if IsInjectorEnabled(ctx) { + if ControllerFromContext(ctx) != nil || IsInjectorEnabled(ctx) { ictx, informers := SetupInformers(ctx, env.GetLogger()) if informers != nil { StartInformers(ctx, informers) // none-blocking @@ -188,23 +170,9 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces logger.Fatal("Error building cloud event client", zap.Error(err)) } - // Setup config watcher if enabled. - if IsConfigMapWatcherEnabled(ctx) { - cmw := SetupConfigMapWatchOrDie(ctx, component, env.GetNamespace()) - ctx = WithConfigMapWatcher(ctx, cmw) - } - // Configuring the adapter adapter := ctor(ctx, env, eventsClient) - // Start config watcher if enabled. - if IsConfigMapWatcherEnabled(ctx) { - logger.Info("Starting configuration manager...") - if err := ConfigMapWatcherFromContext(ctx).Start(ctx.Done()); err != nil { - logger.Fatalw("Failed to start configuration manager", zap.Error(err)) - } - } - // Build the leader elector leConfig, err := env.GetLeaderElectionConfig() if err != nil { @@ -213,15 +181,29 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces if !isHADisabledFlag(ctx) && IsHAEnabled(ctx) { // Signal that we are executing in a context with leader election. + logger.Info("Leader election mode enabled") ctx = leaderelection.WithStandardLeaderElectorBuilder(ctx, kubeclient.Get(ctx), *leConfig) } - elector, err := leaderelection.BuildAdapterElector(ctx, adapter) - if err != nil { - logger.Fatal("Error creating the adapter elector", zap.Error(err)) + // Create and start controller is needed + if ctor := ControllerFromContext(ctx); ctor != nil { + ctrl := ctor(ctx, adapter) + + if leaderelection.HasLeaderElection(ctx) { + // the reconciler MUST implement LeaderAware. + if _, ok := ctrl.Reconciler.(reconciler.LeaderAware); !ok { + log.Fatalf("%T is not leader-aware, all reconcilers must be leader-aware to enable fine-grained leader election.", ctrl.Reconciler) + } + } + + logger.Info("Starting controller") + go controller.StartAll(ctx, ctrl) } - elector.Run(ctx) + // Finally start the adapter (blocking) + if err := adapter.Start(ctx); err != nil { + logging.FromContext(ctx).Warn("Start returned an error", zap.Error(err)) + } } func ConstructEnvOrDie(ector EnvConfigConstructor) EnvConfigAccessor { diff --git a/pkg/adapter/v2/main_message.go b/pkg/adapter/v2/main_message.go index e298ad1b9cb..916c5a32c28 100644 --- a/pkg/adapter/v2/main_message.go +++ b/pkg/adapter/v2/main_message.go @@ -26,8 +26,6 @@ import ( "github.com/kelseyhightower/envconfig" "go.opencensus.io/stats/view" "go.uber.org/zap" - "knative.dev/eventing/pkg/leaderelection" - kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" "knative.dev/pkg/profiling" @@ -118,21 +116,8 @@ func MainMessageAdapterWithContext(ctx context.Context, component string, ector // Configuring the adapter adapter := ctor(ctx, env, httpBindingsSender, reporter) - // Build the leader elector - leConfig, err := env.GetLeaderElectionConfig() - if err != nil { - logger.Error("Error loading the leader election configuration", zap.Error(err)) - } - - if IsHAEnabled(ctx) { - // Signal that we are executing in a context with leader election. - ctx = leaderelection.WithStandardLeaderElectorBuilder(ctx, kubeclient.Get(ctx), *leConfig) + // Finally start the adapter (blocking) + if err := adapter.Start(ctx); err != nil { + logging.FromContext(ctx).Warn("Start returned an error", zap.Error(err)) } - - elector, err := leaderelection.BuildAdapterElector(ctx, adapter) - if err != nil { - logger.Fatal("Error creating the adapter elector", zap.Error(err)) - } - - elector.Run(ctx) } diff --git a/pkg/leaderelection/context.go b/pkg/leaderelection/context.go deleted file mode 100644 index 9d56f0b21f9..00000000000 --- a/pkg/leaderelection/context.go +++ /dev/null @@ -1,158 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package leaderelection - -import ( - "context" - - "go.uber.org/zap" - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/scheme" - typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1" - "k8s.io/client-go/tools/leaderelection" - "k8s.io/client-go/tools/leaderelection/resourcelock" - "k8s.io/client-go/tools/record" - - "knative.dev/pkg/controller" - pkgleaderelection "knative.dev/pkg/leaderelection" - "knative.dev/pkg/logging" - "knative.dev/pkg/system" -) - -// WithStandardLeaderElectorBuilder infuses a context with the ability to build -// LeaderElectors with the provided component configuration acquiring resource -// locks via the provided kubernetes client -func WithStandardLeaderElectorBuilder(ctx context.Context, kc kubernetes.Interface, cc pkgleaderelection.ComponentConfig) context.Context { - return context.WithValue(ctx, builderKey{}, &standardBuilder{ - kc: kc, - lec: cc, - }) -} - -// Elector is the interface for running a leader elector. -type Elector interface { - Run(context.Context) -} - -// Adapter is the interface for running adapter -type Adapter interface { - Start(ctx context.Context) error -} - -// BuildAdapterElector builds a leaderelection.LeaderElector for the given adapter -// using a builder added to the context via WithXXLeaderElectorBuilder. -func BuildAdapterElector(ctx context.Context, adapter Adapter) (Elector, error) { - if val := ctx.Value(builderKey{}); val != nil { - switch builder := val.(type) { - case *standardBuilder: - return builder.BuildElector(ctx, adapter) - } - } - - return &unopposedElector{ - adapter: adapter, - }, nil -} - -type builderKey struct{} - -type standardBuilder struct { - kc kubernetes.Interface - lec pkgleaderelection.ComponentConfig -} - -func (b *standardBuilder) BuildElector(ctx context.Context, adapter Adapter) (Elector, error) { - logger := logging.FromContext(ctx) - recorder := controller.GetEventRecorder(ctx) - if recorder == nil { - // Create event broadcaster - logger.Debug("Creating event broadcaster") - eventBroadcaster := record.NewBroadcaster() - watches := []watch.Interface{ - eventBroadcaster.StartLogging(logger.Named("event-broadcaster").Infof), - eventBroadcaster.StartRecordingToSink( - &typedcorev1.EventSinkImpl{Interface: b.kc.CoreV1().Events(system.Namespace())}), - } - recorder = eventBroadcaster.NewRecorder( - scheme.Scheme, corev1.EventSource{Component: b.lec.Component}) - go func() { - <-ctx.Done() - for _, w := range watches { - w.Stop() - } - }() - } - - // Create a unique identifier so that two controllers on the same host don't - // race. - id, err := pkgleaderelection.UniqueID() - if err != nil { - logger.Fatalw("Failed to get unique ID for leader election", zap.Error(err)) - } - logger.Infof("%v will run in leader-elected mode with id %v", b.lec.Component, id) - - // rl is the resource used to hold the leader election lock. - rl, err := resourcelock.New(pkgleaderelection.KnativeResourceLock, - system.Namespace(), // use namespace we are running in - b.lec.Component, // component is used as the resource name - b.kc.CoreV1(), - b.kc.CoordinationV1(), - resourcelock.ResourceLockConfig{ - Identity: id, - EventRecorder: recorder, - }) - if err != nil { - logger.Fatalw("Error creating lock", zap.Error(err)) - } - - return leaderelection.NewLeaderElector(leaderelection.LeaderElectionConfig{ - Lock: rl, - LeaseDuration: b.lec.LeaseDuration, - RenewDeadline: b.lec.RenewDeadline, - RetryPeriod: b.lec.RetryPeriod, - Callbacks: leaderelection.LeaderCallbacks{ - OnStartedLeading: func(ctx context.Context) { - logger.Infof("%q has started leading %q", rl.Identity(), b.lec.Component) - err := adapter.Start(ctx) - if err != nil { - logger.Fatalw("The adapter failed to start", zap.Error(err)) - } - }, - OnStoppedLeading: func() { - logger.Infof("%q has stopped leading %q", rl.Identity(), b.lec.Component) - }, - }, - ReleaseOnCancel: true, - // TODO: use health check watchdog, knative/pkg#1048 - Name: b.lec.Component, - }) -} - -// unopposedElector runs the adapter without needing to be elected. -type unopposedElector struct { - adapter Adapter -} - -// Run implements Elector -func (ue *unopposedElector) Run(ctx context.Context) { - err := ue.adapter.Start(ctx) - if err != nil { - logging.FromContext(ctx).Warn("Start returned an error", zap.Error(err)) - } -} diff --git a/pkg/leaderelection/context_test.go b/pkg/leaderelection/context_test.go deleted file mode 100644 index 3214abeed0b..00000000000 --- a/pkg/leaderelection/context_test.go +++ /dev/null @@ -1,115 +0,0 @@ -// +build !race -// TODO(https://github.com/kubernetes/kubernetes/issues/90952): Remove the above. - -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package leaderelection - -import ( - "context" - "testing" - "time" - - "k8s.io/apimachinery/pkg/runtime" - fakekube "k8s.io/client-go/kubernetes/fake" - ktesting "k8s.io/client-go/testing" - "knative.dev/pkg/leaderelection" - _ "knative.dev/pkg/system/testing" -) - -type fakeAdapter struct{} - -func (a *fakeAdapter) Start(ctx context.Context) error { - <-ctx.Done() - return nil -} - -func TestWithBuilder(t *testing.T) { - cc := leaderelection.ComponentConfig{ - Component: "component", - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - } - kc := fakekube.NewSimpleClientset() - ctx := context.Background() - a := &fakeAdapter{} - - created := make(chan struct{}) - kc.PrependReactor("create", "leases", - func(action ktesting.Action) (bool, runtime.Object, error) { - close(created) - return false, nil, nil - }, - ) - - updated := make(chan struct{}) - kc.PrependReactor("update", "leases", - func(action ktesting.Action) (bool, runtime.Object, error) { - // Only close update once. - select { - case <-updated: - default: - close(updated) - } - return false, nil, nil - }, - ) - - if le, err := BuildAdapterElector(ctx, a); err != nil { - t.Errorf("BuildElector() = %v, wanted an unopposedElector", err) - } else if _, ok := le.(*unopposedElector); !ok { - t.Errorf("BuildElector() = %T, wanted an unopposedElector", le) - } - - ctx = WithStandardLeaderElectorBuilder(ctx, kc, cc) - - le, err := BuildAdapterElector(ctx, a) - if err != nil { - t.Errorf("BuildElector() = %v", err) - } - - // We shouldn't see leases until we Run the elector. - select { - case <-created: - t.Error("Got created, want no actions.") - case <-updated: - t.Error("Got updated, want no actions.") - default: - } - - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go le.Run(ctx) - - select { - case <-created: - // We expect the lease to be created. - case <-time.After(1 * time.Second): - t.Fatal("Timed out waiting for lease creation.") - } - - // Cancelling the context should case us to give up leadership. - cancel() - - select { - case <-updated: - // We expect the lease to be updated. - case <-time.After(1 * time.Second): - t.Fatal("Timed out waiting for lease update.") - } -} diff --git a/pkg/leaderelection/leader_election_test.go b/pkg/leaderelection/leader_election_test.go deleted file mode 100644 index e2aa8b5a7b7..00000000000 --- a/pkg/leaderelection/leader_election_test.go +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package leaderelection - -import ( - "strings" - "testing" - "time" - - "github.com/google/go-cmp/cmp" - corev1 "k8s.io/api/core/v1" - - . "knative.dev/pkg/configmap/testing" - "knative.dev/pkg/kmeta" - kle "knative.dev/pkg/leaderelection" -) - -func okConfig() *kle.Config { - return &kle.Config{ - Buckets: 1, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - } -} - -func okData() map[string]string { - return map[string]string{ - // values in this data come from the defaults suggested in the - // code: - // https://github.com/kubernetes/client-go/blob/kubernetes-1.16.0/tools/leaderelection/leaderelection.go - "leaseDuration": "15s", - "renewDeadline": "10s", - "retryPeriod": "2s", - } -} - -func TestValidateConfig(t *testing.T) { - cases := []struct { - name string - data map[string]string - expected *kle.Config - err string - }{{ - name: "OK", - data: okData(), - expected: okConfig(), - }, { - name: "invalid config", - data: kmeta.UnionMaps(okData(), map[string]string{ - "leaseDuration": "this-is-the-end", - }), - err: `failed to parse "leaseDuration": time: invalid duration`, - }} - - for _, tc := range cases { - t.Run(tc.name, func(t *testing.T) { - actualConfig, actualErr := ValidateConfig(&corev1.ConfigMap{Data: tc.data}) - if actualErr != nil { - // Different versions of Go quote input differently, so check prefix. - if got, want := actualErr.Error(), tc.err; !strings.HasPrefix(got, want) { - t.Fatalf("Err = '%s', want: '%s'", got, want) - } - } else if tc.err != "" { - t.Fatal("Expected an error, got none") - } - if got, want := actualConfig, tc.expected; !cmp.Equal(got, want) { - t.Errorf("Config = %v, want: %v, diff(-want,+got):\n%s", got, want, cmp.Diff(want, got)) - } - }) - } -} - -func TestEventingConfig(t *testing.T) { - actual, example := ConfigMapsFromTestFile(t, "config-leader-election") - for _, test := range []struct { - name string - data *corev1.ConfigMap - want *kle.Config - }{{ - name: "Default config", - want: &kle.Config{ - Buckets: 1, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - }, - data: actual, - }, { - name: "Example config", - want: &kle.Config{ - Buckets: 1, - LeaseDuration: 15 * time.Second, - RenewDeadline: 10 * time.Second, - RetryPeriod: 2 * time.Second, - }, - data: example, - }} { - t.Run(test.name, func(t *testing.T) { - cm, err := ValidateConfig(test.data) - if err != nil { - t.Fatalf("Error parsing config = %v", err) - } - if got, want := cm, test.want; !cmp.Equal(got, want) { - t.Errorf("Config mismatch: (-want,+got):\n%s", cmp.Diff(want, got)) - } - }) - } -} diff --git a/pkg/leaderelection/testdata/config-leader-election.yaml b/pkg/leaderelection/testdata/config-leader-election.yaml deleted file mode 120000 index b4397c307d4..00000000000 --- a/pkg/leaderelection/testdata/config-leader-election.yaml +++ /dev/null @@ -1 +0,0 @@ -../../../config/config-leader-election.yaml \ No newline at end of file diff --git a/vendor/knative.dev/pkg/reconciler/leader.go b/vendor/knative.dev/pkg/reconciler/leader.go index 6fece8526df..ecff3023b12 100644 --- a/vendor/knative.dev/pkg/reconciler/leader.go +++ b/vendor/knative.dev/pkg/reconciler/leader.go @@ -18,6 +18,7 @@ package reconciler import ( "sync" + "fmt" "k8s.io/apimachinery/pkg/types" ) @@ -88,6 +89,7 @@ func (laf *LeaderAwareFuncs) IsLeaderFor(key types.NamespacedName) bool { // Promote implements LeaderAware func (laf *LeaderAwareFuncs) Promote(b Bucket, enq func(Bucket, types.NamespacedName)) error { + fmt.Println("PROMOTE") func() { laf.Lock() defer laf.Unlock() @@ -98,6 +100,8 @@ func (laf *LeaderAwareFuncs) Promote(b Bucket, enq func(Bucket, types.Namespaced }() if promote := laf.PromoteFunc; promote != nil { + fmt.Println("PROMOTE FUNC") + return promote(b, enq) } return nil From 456471852e1d12aeced264cac138040583c2db50 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 14 Sep 2020 09:49:55 -0400 Subject: [PATCH 7/8] rebasing --- .../deployments/pingsource-mt-adapter.yaml | 2 +- go.sum | 22 ------- pkg/adapter/mtping/adapter_test.go | 4 -- pkg/adapter/v2/context_test.go | 17 ++++++ pkg/adapter/v2/main.go | 2 +- pkg/adapter/v2/main_test.go | 61 ------------------- .../configmappropagation/controller.go | 3 - .../eventing/v1/broker/controller.go | 3 - .../eventing/v1/trigger/controller.go | 3 - .../eventing/v1beta1/broker/controller.go | 3 - .../eventing/v1beta1/eventtype/controller.go | 3 - .../eventing/v1beta1/trigger/controller.go | 3 - .../flows/v1/parallel/controller.go | 3 - .../flows/v1/sequence/controller.go | 3 - .../flows/v1beta1/parallel/controller.go | 3 - .../flows/v1beta1/sequence/controller.go | 3 - .../messaging/v1/channel/controller.go | 3 - .../v1/inmemorychannel/controller.go | 3 - .../messaging/v1/subscription/controller.go | 3 - .../messaging/v1beta1/channel/controller.go | 3 - .../v1beta1/inmemorychannel/controller.go | 3 - .../v1beta1/subscription/controller.go | 3 - .../v1alpha2/apiserversource/controller.go | 3 - .../v1alpha2/containersource/controller.go | 3 - .../sources/v1alpha2/pingsource/controller.go | 3 - .../v1beta1/apiserversource/controller.go | 3 - .../v1beta1/containersource/controller.go | 3 - .../sources/v1beta1/pingsource/controller.go | 3 - .../sources/v1beta1/pingsource/reconciler.go | 3 - vendor/knative.dev/pkg/reconciler/leader.go | 4 -- 30 files changed, 19 insertions(+), 162 deletions(-) diff --git a/config/core/deployments/pingsource-mt-adapter.yaml b/config/core/deployments/pingsource-mt-adapter.yaml index 93391f51fa5..bbaf579e8ca 100644 --- a/config/core/deployments/pingsource-mt-adapter.yaml +++ b/config/core/deployments/pingsource-mt-adapter.yaml @@ -21,7 +21,7 @@ metadata: eventing.knative.dev/release: devel spec: # when set to 0 (and only 0) will be set to 1 when the first PingSource is created. - replicas: 3 + replicas: 0 selector: matchLabels: eventing.knative.dev/source: ping-source-controller diff --git a/go.sum b/go.sum index c88d07c7c7b..2386a3c8d48 100644 --- a/go.sum +++ b/go.sum @@ -191,12 +191,8 @@ github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkY github.com/asaskevich/govalidator v0.0.0-20180720115003-f9ffefc3facf/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:lB+ZfQJz7igIIfQNfa7Ml4HSf2uFQQRzpGGRXenZAgY= github.com/asaskevich/govalidator v0.0.0-20200108200545-475eaeb16496/go.mod h1:oGkLhpf+kjZl6xBf758TQhh5XrAeiJv/7FRz/2spLIg= -<<<<<<< HEAD github.com/aws/aws-k8s-tester v0.0.0-20190114231546-b411acf57dfe/go.mod h1:1ADF5tAtU1/mVtfMcHAYSm2fPw71DA7fFk0yed64/0I= github.com/aws/aws-k8s-tester v0.9.3/go.mod h1:nsh1f7joi8ZI1lvR+Ron6kJM2QdCYPU/vFePghSSuTc= -======= -github.com/aslakhellesoy/gox v1.0.100/go.mod h1:AJl542QsKKG96COVsv0N74HHzVQgDIQPceVUh1aeU2M= ->>>>>>> ha buckets github.com/aws/aws-k8s-tester v1.0.0/go.mod h1:NUNd9k43+h9O5tvwL+4N1Ctb//SapmeeFX1G0/2/0Qc= github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= github.com/aws/aws-sdk-go v1.15.27/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0= @@ -295,13 +291,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= -github.com/cucumber/gherkin-go/v11 v11.0.0 h1:cwVwN1Qn2VRSfHZNLEh5x00tPBmZcjATBWDpxsR5Xug= -github.com/cucumber/gherkin-go/v11 v11.0.0/go.mod h1:CX33k2XU2qog4e+TFjOValoq6mIUq0DmVccZs238R9w= -github.com/cucumber/godog v0.10.0 h1:W01u1+o8bRpgqJRLrclN3iAanU1jAao+TwOMoSV9g1Y= -github.com/cucumber/godog v0.10.0/go.mod h1:0Q+MOUg8Z9AhzLV+nNMbThQ2x1b17yYwGyahApTLjJA= -github.com/cucumber/messages-go/v10 v10.0.1/go.mod h1:kA5T38CBlBbYLU12TIrJ4fk4wSkVVOgyh7Enyy8WnSg= -github.com/cucumber/messages-go/v10 v10.0.3 h1:m/9SD/K/A15WP7i1aemIv7cwvUw+viS51Ui5HBw1cdE= -github.com/cucumber/messages-go/v10 v10.0.3/go.mod h1:9jMZ2Y8ZxjLY6TG2+x344nt5rXstVVDYSdS5ySfI1WY= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/davecgh/go-spew v0.0.0-20151105211317-5215b55f46b2/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -482,8 +471,6 @@ github.com/gobwas/glob v0.2.3/go.mod h1:d3Ez4x06l9bZtSvzIay5+Yzi0fmZzPgnTbPcKjJA github.com/godbus/dbus v0.0.0-20190422162347-ade71ed3457e/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4= github.com/gofrs/flock v0.0.0-20190320160742-5135e617513b/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= github.com/gofrs/flock v0.7.1/go.mod h1:F1TvTiK9OcQqauNUHlbJvyl9Qa1QvF/gOUDKA14jxHU= -github.com/gofrs/uuid v3.2.0+incompatible h1:y12jRkkFxsd7GpqdSZ+/KCs/fJbqpEXSGd4+jfEaewE= -github.com/gofrs/uuid v3.2.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.0.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -675,10 +662,6 @@ github.com/hashicorp/go-cleanhttp v0.5.1/go.mod h1:JpRdi6/HCYpAwUzNwuwqhbovhLtng github.com/hashicorp/go-hclog v0.9.2 h1:CG6TE5H9/JXsFWJCfoIVpKFIkFe6ysEuHirp4DxCsHI= github.com/hashicorp/go-hclog v0.9.2/go.mod h1:5CU+agLiy3J7N7QjHK5d05KxGsuXiQLrjA0H7acj2lQ= github.com/hashicorp/go-immutable-radix v1.0.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-immutable-radix v1.2.0 h1:l6UW37iCXwZkZoAbEYnptSHVE/cQ5bOTPYG5W3vf9+8= -github.com/hashicorp/go-immutable-radix v1.2.0/go.mod h1:0y9vanUI8NX6FsYoO3zeMjhV/C5i9g4Q3DwcSNZ4P60= -github.com/hashicorp/go-memdb v1.2.1 h1:wI9btDjYUOJJHTCnRlAG/TkRyD/ij7meJMrLK9X31Cc= -github.com/hashicorp/go-memdb v1.2.1/go.mod h1:OSvLJ662Jim8hMM+gWGyhktyWk2xPCnWMc7DWIqtkGA= github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM= github.com/hashicorp/go-multierror v0.0.0-20161216184304-ed905158d874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= github.com/hashicorp/go-multierror v0.0.0-20171204182908-b7773ae21874/go.mod h1:JMRHfdO9jKNzS/+BTlxCjKNQHg/jZAft8U7LloJvN7I= @@ -692,7 +675,6 @@ github.com/hashicorp/go-sockaddr v1.0.0/go.mod h1:7Xibr9yA9JjQq1JpNB2Vw7kxv8xerX github.com/hashicorp/go-syslog v1.0.0/go.mod h1:qPfqrKkXGihmCqbJM2mZgkZGvKG1dFdvsLplgctolz4= github.com/hashicorp/go-uuid v1.0.0/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.1/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= -github.com/hashicorp/go-version v1.0.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90= github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= @@ -1100,8 +1082,6 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.5.1 h1:nOGnQDM7FYENwehXlg/kFVnos3rEvtKTjRvOWSzb6H4= github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tdakkota/asciicheck v0.0.0-20200416190851-d7f85be797a2/go.mod h1:yHp0ai0Z9gUljN3o0xMhYJnH/IcvkdTBOX2fmJ93JEM= @@ -1716,8 +1696,6 @@ gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20190709130402-674ba3eaed22/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d h1:LCPbGQ34PMrwad11aMZ+dbz5SAsq/0ySjRwQ8I9Qwd8= gopkg.in/yaml.v3 v3.0.0-20191026110619-0b21df46bc1d/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= helm.sh/helm/v3 v3.1.1/go.mod h1:WYsFJuMASa/4XUqLyv54s0U/f3mlAaRErGmyy4z921g= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/adapter/mtping/adapter_test.go b/pkg/adapter/mtping/adapter_test.go index 4a928ea2f3e..8ba2876c77b 100644 --- a/pkg/adapter/mtping/adapter_test.go +++ b/pkg/adapter/mtping/adapter_test.go @@ -32,16 +32,12 @@ import ( "knative.dev/pkg/logging" rectesting "knative.dev/pkg/reconciler/testing" - pkgadapter "knative.dev/eventing/pkg/adapter/v2" adaptertest "knative.dev/eventing/pkg/adapter/v2/test" ) func TestStartStopAdapter(t *testing.T) { ctx, _ := rectesting.SetupFakeContext(t) ctx, cancel := context.WithCancel(ctx) - cmw := pkgadapter.SetupConfigMapWatchOrDie(ctx, "component", "test-ns") - ctx = pkgadapter.WithConfigMapWatcher(ctx, cmw) - envCfg := NewEnvConfig() ce := adaptertest.NewTestClient() diff --git a/pkg/adapter/v2/context_test.go b/pkg/adapter/v2/context_test.go index 75e393b24e3..24f47f454b6 100644 --- a/pkg/adapter/v2/context_test.go +++ b/pkg/adapter/v2/context_test.go @@ -34,3 +34,20 @@ func TestWithController(t *testing.T) { t.Error("expected non-nil controller constructor") } } + +func TestWithHAEnabled(t *testing.T) { + ctx := context.Background() + ctx = WithHAEnabled(ctx) + if !IsHAEnabled(ctx) { + t.Error("Expected HA to be enabled") + } + + ctx = withHADisabledFlag(ctx) + if !IsHAEnabled(ctx) { + t.Error("Expected HA to be enabled") + } + if !isHADisabledFlag(ctx) { + t.Error("Expected HA to be disabled via commandline flag") + } + +} diff --git a/pkg/adapter/v2/main.go b/pkg/adapter/v2/main.go index 5ab731f0824..02b0340e281 100644 --- a/pkg/adapter/v2/main.go +++ b/pkg/adapter/v2/main.go @@ -190,7 +190,7 @@ func MainWithInformers(ctx context.Context, component string, env EnvConfigAcces ctrl := ctor(ctx, adapter) if leaderelection.HasLeaderElection(ctx) { - // the reconciler MUST implement LeaderAware. + // the reconciler MUST implement LeaderAware. if _, ok := ctrl.Reconciler.(reconciler.LeaderAware); !ok { log.Fatalf("%T is not leader-aware, all reconcilers must be leader-aware to enable fine-grained leader election.", ctrl.Reconciler) } diff --git a/pkg/adapter/v2/main_test.go b/pkg/adapter/v2/main_test.go index f85041a18bc..a620f86363c 100644 --- a/pkg/adapter/v2/main_test.go +++ b/pkg/adapter/v2/main_test.go @@ -26,7 +26,6 @@ import ( _ "knative.dev/pkg/client/injection/kube/client/fake" "knative.dev/pkg/leaderelection" "knative.dev/pkg/metrics" - rectesting "knative.dev/pkg/reconciler/testing" _ "knative.dev/pkg/system/testing" ) @@ -111,49 +110,6 @@ func TestMainWithInformerNoLeaderElection(t *testing.T) { defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...) } -func TestMainWithContextWithConfigWatcher(t *testing.T) { - os.Setenv("K_SINK", "http://sink") - os.Setenv("NAMESPACE", "ns") - os.Setenv("K_METRICS_CONFIG", "{}") - os.Setenv("K_LOGGING_CONFIG", "{}") - os.Setenv("MODE", "mymode") - - defer func() { - os.Unsetenv("K_SINK") - os.Unsetenv("NAMESPACE") - os.Unsetenv("K_METRICS_CONFIG") - os.Unsetenv("K_LOGGING_CONFIG") - os.Unsetenv("MODE") - }() - - ctx, _ := rectesting.SetupFakeContext(t) - WithConfigMapWatcherEnabled(ctx) - ctx, cancel := context.WithCancel(ctx) - - MainWithContext(ctx, - "mycomponent", - func() EnvConfigAccessor { return &myEnvConfig{} }, - func(ctx context.Context, processed EnvConfigAccessor, client cloudevents.Client) Adapter { - env := processed.(*myEnvConfig) - - if env.Mode != "mymode" { - t.Errorf("Expected mode mymode, got: %s", env.Mode) - } - - if env.Sink != "http://sink" { - t.Errorf("Expected sinkURI http://sink, got: %s", env.Sink) - } - - if leaderelection.HasLeaderElection(ctx) { - t.Error("Expected no leader election, but got leader election") - } - return &myAdapter{} - }) - - cancel() - defer view.Unregister(metrics.NewMemStatsAll().DefaultViews()...) -} - func TestStartInformers(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) StartInformers(ctx, nil) @@ -163,20 +119,3 @@ func TestStartInformers(t *testing.T) { func (m *myAdapter) Start(_ context.Context) error { return nil } - -func TestHAContext(t *testing.T) { - ctx := context.Background() - ctx = WithHAEnabled(ctx) - if !IsHAEnabled(ctx) { - t.Error("Expected HA to be enabled") - } - - ctx = withHADisabledFlag(ctx) - if !IsHAEnabled(ctx) { - t.Error("Expected HA to be enabled") - } - if !isHADisabledFlag(ctx) { - t.Error("Expected HA to be disabled via commandline flag") - } - -} diff --git a/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go b/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go index bf27ae2f521..a34f44214d1 100644 --- a/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go +++ b/pkg/client/injection/reconciler/configs/v1alpha1/configmappropagation/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1/broker/controller.go b/pkg/client/injection/reconciler/eventing/v1/broker/controller.go index 14980deb152..41e1231c62f 100644 --- a/pkg/client/injection/reconciler/eventing/v1/broker/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1/broker/controller.go @@ -109,9 +109,6 @@ func NewImpl(ctx context.Context, r Interface, classValue string, optionsFns ... if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go b/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go index b9ef6380f23..504a628cf1a 100644 --- a/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1/trigger/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go index 6da43b9d823..9e757e2cd8e 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/broker/controller.go @@ -109,9 +109,6 @@ func NewImpl(ctx context.Context, r Interface, classValue string, optionsFns ... if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go index 7fc14fe1adb..d2bd35e93e4 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/eventtype/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go b/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go index 394cd346308..2a33794242b 100644 --- a/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go +++ b/pkg/client/injection/reconciler/eventing/v1beta1/trigger/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go index b321f203abe..f533f092749 100644 --- a/pkg/client/injection/reconciler/flows/v1/parallel/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/parallel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go index e873986539c..f64d0221c6c 100644 --- a/pkg/client/injection/reconciler/flows/v1/sequence/controller.go +++ b/pkg/client/injection/reconciler/flows/v1/sequence/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go b/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go index 1b84ddf40a4..c16ff28300b 100644 --- a/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go +++ b/pkg/client/injection/reconciler/flows/v1beta1/parallel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go b/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go index 656b5a2a39b..808c8aba550 100644 --- a/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go +++ b/pkg/client/injection/reconciler/flows/v1beta1/sequence/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/channel/controller.go b/pkg/client/injection/reconciler/messaging/v1/channel/controller.go index 2a614bf6a56..940dc9bd903 100644 --- a/pkg/client/injection/reconciler/messaging/v1/channel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/channel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go b/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go index ccc76254c58..346de276b40 100644 --- a/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/inmemorychannel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go b/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go index 665b6a54c3c..235ce2f1b66 100644 --- a/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1/subscription/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go index 7130bec1d7c..44f432c71d8 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/channel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go index c79b7c6324c..3fb2e152800 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/inmemorychannel/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go b/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go index 504e9d53c41..cc2a9dc938b 100644 --- a/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go +++ b/pkg/client/injection/reconciler/messaging/v1beta1/subscription/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go index c5ab56a175e..5f29860b633 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/apiserversource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go index 0da1c4c1b3c..6534a7d987b 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/containersource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go b/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go index df5e232217f..a074c02418b 100644 --- a/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1alpha2/pingsource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go index ffd6ee0b503..5bf729d06ab 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/apiserversource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go index 79141b40e9e..8d5ee04ba18 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/containersource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go index 7b50f50349a..bb2eb85686e 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/controller.go @@ -105,9 +105,6 @@ func NewImpl(ctx context.Context, r Interface, optionsFns ...controller.OptionsF if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } rec.Recorder = createRecorder(ctx, agentName) diff --git a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go index 3de4f28273c..e9799929814 100644 --- a/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go +++ b/pkg/client/injection/reconciler/sources/v1beta1/pingsource/reconciler.go @@ -169,9 +169,6 @@ func NewReconciler(ctx context.Context, logger *zap.SugaredLogger, client versio if opts.SkipStatusUpdates { rec.skipStatusUpdates = true } - if opts.DemoteFunc != nil { - rec.DemoteFunc = opts.DemoteFunc - } } return rec diff --git a/vendor/knative.dev/pkg/reconciler/leader.go b/vendor/knative.dev/pkg/reconciler/leader.go index ecff3023b12..6fece8526df 100644 --- a/vendor/knative.dev/pkg/reconciler/leader.go +++ b/vendor/knative.dev/pkg/reconciler/leader.go @@ -18,7 +18,6 @@ package reconciler import ( "sync" - "fmt" "k8s.io/apimachinery/pkg/types" ) @@ -89,7 +88,6 @@ func (laf *LeaderAwareFuncs) IsLeaderFor(key types.NamespacedName) bool { // Promote implements LeaderAware func (laf *LeaderAwareFuncs) Promote(b Bucket, enq func(Bucket, types.NamespacedName)) error { - fmt.Println("PROMOTE") func() { laf.Lock() defer laf.Unlock() @@ -100,8 +98,6 @@ func (laf *LeaderAwareFuncs) Promote(b Bucket, enq func(Bucket, types.Namespaced }() if promote := laf.PromoteFunc; promote != nil { - fmt.Println("PROMOTE FUNC") - return promote(b, enq) } return nil From a0d0d2f32e96a978648050e22818a3bba093010a Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Mon, 21 Sep 2020 09:57:18 -0400 Subject: [PATCH 8/8] remove pingconfig --- pkg/adapter/mtping/runner.go | 25 ------------------------- 1 file changed, 25 deletions(-) diff --git a/pkg/adapter/mtping/runner.go b/pkg/adapter/mtping/runner.go index 67507a7b46b..1e3018c5ae6 100644 --- a/pkg/adapter/mtping/runner.go +++ b/pkg/adapter/mtping/runner.go @@ -22,8 +22,6 @@ import ( "math/rand" "time" - corev1 "k8s.io/api/core/v1" - cloudevents "github.com/cloudevents/sdk-go/v2" cecontext "github.com/cloudevents/sdk-go/v2/context" "github.com/google/uuid" @@ -59,29 +57,6 @@ type cronJobsRunner struct { kubeClient kubernetes.Interface } -type PingConfig struct { - corev1.ObjectReference `json:",inline"` - - // Schedule is the cronjob schedule. Defaults to `* * * * *`. - Schedule string `json:"schedule"` - - // JsonData is json encoded data used as the body of the event posted to - // the sink. Default is empty. If set, datacontenttype will also be set - // to "application/json". - // +optional - JsonData string `json:"jsonData,omitempty"` - - // Extensions specify what attribute are added or overridden on the - // outbound event. Each `Extensions` key-value pair are set on the event as - // an attribute extension independently. - // +optional - Extensions map[string]string `json:"extensions,omitempty"` - - // SinkURI is the current active sink URI that has been configured for the - // Source. - SinkURI string `json:"sinkUri,omitempty"` -} - const ( resourceGroup = "pingsources.sources.knative.dev" )