diff --git a/config/channel/consolidated/deployments/dispatcher.yaml b/config/channel/consolidated/deployments/dispatcher.yaml index 896f9ea9e0..c168904a50 100644 --- a/config/channel/consolidated/deployments/dispatcher.yaml +++ b/config/channel/consolidated/deployments/dispatcher.yaml @@ -30,7 +30,9 @@ spec: template: metadata: labels: + # Do not change. Used by the controller for probing. messaging.knative.dev/channel: kafka-channel + # Do not change. Used by the controller for probing. messaging.knative.dev/role: dispatcher kafka.eventing.knative.dev/release: devel spec: diff --git a/config/channel/consolidated/roles/controller-clusterrole.yaml b/config/channel/consolidated/roles/controller-clusterrole.yaml index 42cacb2b40..dc46e7034b 100644 --- a/config/channel/consolidated/roles/controller-clusterrole.yaml +++ b/config/channel/consolidated/roles/controller-clusterrole.yaml @@ -39,7 +39,6 @@ rules: - apiGroups: - "" # Core API group. resources: - - services - configmaps - secrets verbs: @@ -51,6 +50,7 @@ rules: - "" # Core API group. resources: - services + - serviceaccounts verbs: &everything - get - list @@ -74,18 +74,11 @@ rules: - create - patch - update - - apiGroups: - - "" - resources: - - configmaps - verbs: - - get - - list - - watch - apiGroups: - "" # Core API group. resources: - endpoints + - pods verbs: - get - list @@ -96,11 +89,6 @@ rules: - deployments - deployments/status verbs: *everything - - apiGroups: - - "" # Core API group. - resources: - - serviceaccounts - verbs: *everything - apiGroups: - rbac.authorization.k8s.io resources: diff --git a/go.mod b/go.mod index 65d9ec4c76..d0c7ad1bfa 100644 --- a/go.mod +++ b/go.mod @@ -25,6 +25,7 @@ require ( github.com/stretchr/testify v1.6.1 github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c go.opencensus.io v0.22.6 + go.uber.org/atomic v1.7.0 go.uber.org/zap v1.16.0 golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897 // indirect golang.org/x/sync v0.0.0-20201207232520-09787c993a3a @@ -35,5 +36,6 @@ require ( k8s.io/utils v0.0.0-20200729134348-d5654de09c73 knative.dev/eventing v0.21.1-0.20210309092525-37e702765dbc knative.dev/hack v0.0.0-20210305150220-f99a25560134 + knative.dev/networking v0.0.0-20210304153916-f813b5904943 knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d ) diff --git a/go.sum b/go.sum index fe12d25c9b..5494498f96 100644 --- a/go.sum +++ b/go.sum @@ -330,6 +330,8 @@ github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= +github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/groupcache v0.0.0-20160516000752-02826c3e7903/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -510,6 +512,7 @@ github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dv github.com/kelseyhightower/envconfig v1.4.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= +github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.9.7/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.11.0 h1:wJbzvpYMVGG9iTI9VxpnNZfd4DzMPoCWze3GgSqz8yg= @@ -687,6 +690,7 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So github.com/rogpeppe/fastuuid v1.2.0 h1:Ppwyp6VYCF1nvBTXL3trRso7mXMlRrw9ooo375wvi2s= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rs/dnscache v0.0.0-20210201191234-295bba877686/go.mod h1:qe5TWALJ8/a1Lqznoc5BDHpYX/8HU60Hm2AwRmqzxqA= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts= github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= @@ -1035,6 +1039,7 @@ golang.org/x/tools v0.0.0-20200512131952-2bc93b1c0c88/go.mod h1:EkVYQZoAsY45+roY golang.org/x/tools v0.0.0-20200515010526-7d3b6ebf133d/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200616133436-c1934b75d054/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200618134242-20370b0cb4b2/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= +golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA= @@ -1043,6 +1048,8 @@ golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20201201161351-ac6f37ff4c2a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818 h1:u2ssHESKr0HP2d1wlnjMKH+V/22Vg1lGCVuXmOYU1qA= golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a h1:CB3a9Nez8M13wwlr/E2YtwoU+qYHKfC+JrDa45RXXoQ= +golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -1244,7 +1251,8 @@ knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf h1:u4cY4jr2LYvhoz/1HBWEPsMiL knative.dev/hack v0.0.0-20210203173706-8368e1f6eacf/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= knative.dev/hack v0.0.0-20210305150220-f99a25560134 h1:lUllAp28TkevQIgWrsjow8ZLnXJy3AraRzGFm/ffD2c= knative.dev/hack v0.0.0-20210305150220-f99a25560134/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= -knative.dev/pkg v0.0.0-20210303192215-8fbab7ebb77b h1:AynUh7MBUe44E60vh0vIyF2Bes4AOoTT2ejy9xrF0FU= +knative.dev/networking v0.0.0-20210304153916-f813b5904943 h1:EEAnCZzqVoTNNPMYyONXqOD3e/45OPVahA4jm8ET4/g= +knative.dev/networking v0.0.0-20210304153916-f813b5904943/go.mod h1:G+KCelFuLocMrnfayHoxqsFG+IYX4t8To1celZes77k= knative.dev/pkg v0.0.0-20210303192215-8fbab7ebb77b/go.mod h1:TJSdebQOWX5N2bszohOYVi0H1QtXbtlYLuMghAFBMhY= knative.dev/pkg v0.0.0-20210308052421-737401c38b22/go.mod h1:fP690UCcs5x+qQVhjJxNcm97OWIiUdFC1dqbD3Gsp64= knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d h1:2Uc3qyLRLIYOqJrGGKFkJc69X+cxlhoH3jk7p4b4KFM= diff --git a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go index 0d08deb96f..5b01e6b916 100644 --- a/pkg/channel/consolidated/dispatcher/consumer_message_handler.go +++ b/pkg/channel/consolidated/dispatcher/consumer_message_handler.go @@ -34,12 +34,17 @@ type consumerMessageHandler struct { sub Subscription dispatcher *eventingchannels.MessageDispatcherImpl kafkaSubscription *KafkaSubscription + consumerGroup string } var _ consumer.KafkaConsumerHandler = (*consumerMessageHandler)(nil) -func (c consumerMessageHandler) SetReady(ready bool) { - c.kafkaSubscription.SetReady(c.sub.UID, ready) +func (c consumerMessageHandler) GetConsumerGroup() string { + return c.consumerGroup +} + +func (c consumerMessageHandler) SetReady(partition int32, ready bool) { + c.kafkaSubscription.SetReady(c.sub.UID, partition, ready) } func (c consumerMessageHandler) Handle(ctx context.Context, consumerMessage *sarama.ConsumerMessage) (bool, error) { diff --git a/pkg/channel/consolidated/dispatcher/dispatcher.go b/pkg/channel/consolidated/dispatcher/dispatcher.go index cc47515263..2a83999c17 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher.go @@ -25,11 +25,6 @@ import ( "sync" "sync/atomic" - "k8s.io/apimachinery/pkg/util/sets" - - "knative.dev/eventing-kafka/pkg/common/client" - "knative.dev/eventing-kafka/pkg/common/tracing" - "github.com/Shopify/sarama" protocolkafka "github.com/cloudevents/sdk-go/protocol/kafka_sarama/v2" "github.com/cloudevents/sdk-go/v2/binding" @@ -37,13 +32,16 @@ import ( "go.opencensus.io/trace" "go.uber.org/zap" "k8s.io/apimachinery/pkg/types" - eventingchannels "knative.dev/eventing/pkg/channel" - "knative.dev/eventing/pkg/kncloudevents" - "knative.dev/pkg/kmeta" + "k8s.io/apimachinery/pkg/util/sets" "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" "knative.dev/eventing-kafka/pkg/channel/distributed/common/env" + "knative.dev/eventing-kafka/pkg/common/client" "knative.dev/eventing-kafka/pkg/common/consumer" + "knative.dev/eventing-kafka/pkg/common/tracing" + eventingchannels "knative.dev/eventing/pkg/channel" + "knative.dev/eventing/pkg/kncloudevents" + "knative.dev/pkg/kmeta" ) const ( @@ -191,9 +189,11 @@ func (d *KafkaDispatcher) ServeHTTP(w nethttp.ResponseWriter, r *nethttp.Request } d.channelSubscriptions[channelRef].readySubscriptionsLock.RLock() defer d.channelSubscriptions[channelRef].readySubscriptionsLock.RUnlock() - var subscriptions = make(map[string][]string) + var subscriptions = make(map[string][]int32) w.Header().Set(dispatcherReadySubHeader, channelRefName) - subscriptions[channelRefNamespace+"/"+channelRefName] = d.channelSubscriptions[channelRef].channelReadySubscriptions.List() + for s, ps := range d.channelSubscriptions[channelRef].channelReadySubscriptions { + subscriptions[s] = ps.List() + } jsonResult, err := json.Marshal(subscriptions) if err != nil { d.logger.Errorf("Error marshalling json for sub-status channelref: %s/%s, %w", channelRefNamespace, channelRefName, err) @@ -234,8 +234,9 @@ func (d *KafkaDispatcher) UpdateKafkaConsumers(config *Config) (map[types.UID]er } } else { //ensure the pointer is populated or things go boom d.channelSubscriptions[channelRef] = &KafkaSubscription{ + logger: d.logger, subs: []types.UID{}, - channelReadySubscriptions: sets.String{}, + channelReadySubscriptions: map[string]sets.Int32{}, } } @@ -291,13 +292,18 @@ func (d *KafkaDispatcher) UpdateHostToChannelMap(config *Config) error { // subscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // subscribe must be called under updateLock. func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference, sub Subscription) error { - d.logger.Info("Subscribing", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID)) - + d.logger.Infow("Subscribing to Kafka Channel", zap.Any("channelRef", channelRef), zap.Any("subscription", sub.UID)) topicName := d.topicFunc(utils.KafkaChannelSeparator, channelRef.Namespace, channelRef.Name) groupID := fmt.Sprintf("kafka.%s.%s.%s", channelRef.Namespace, channelRef.Name, string(sub.UID)) - - handler := &consumerMessageHandler{d.logger, sub, d.dispatcher, d.channelSubscriptions[channelRef]} - + handler := &consumerMessageHandler{ + d.logger, + sub, + d.dispatcher, + d.channelSubscriptions[channelRef], + groupID, + } + d.logger.Debugw("Starting consumer group", zap.Any("channelRef", channelRef), + zap.Any("subscription", sub.UID), zap.String("topic", topicName), zap.String("consumer group", groupID)) consumerGroup, err := d.kafkaConsumerFactory.StartConsumerGroup(groupID, []string{topicName}, d.logger, handler) if err != nil { @@ -324,7 +330,7 @@ func (d *KafkaDispatcher) subscribe(channelRef eventingchannels.ChannelReference // unsubscribe reads kafkaConsumers which gets updated in UpdateConfig in a separate go-routine. // unsubscribe must be called under updateLock. func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, sub Subscription) error { - d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.String("subscription", sub.String())) + d.logger.Infow("Unsubscribing from channel", zap.Any("channel", channel), zap.Any("subscription", sub.UID)) delete(d.subscriptions, sub.UID) if _, ok := d.channelSubscriptions[channel]; !ok { return nil @@ -340,6 +346,7 @@ func (d *KafkaDispatcher) unsubscribe(channel eventingchannels.ChannelReference, } if consumer, ok := d.subsConsumerGroups[sub.UID]; ok { delete(d.subsConsumerGroups, sub.UID) + d.logger.Debugw("Closing cached consumer group", zap.Any("consumer group", consumer)) return consumer.Close() } return nil diff --git a/pkg/channel/consolidated/dispatcher/dispatcher_test.go b/pkg/channel/consolidated/dispatcher/dispatcher_test.go index 50375af28f..f968bba865 100644 --- a/pkg/channel/consolidated/dispatcher/dispatcher_test.go +++ b/pkg/channel/consolidated/dispatcher/dispatcher_test.go @@ -33,10 +33,12 @@ import ( "go.uber.org/zap/zaptest" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" + "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" "knative.dev/eventing-kafka/pkg/common/consumer" eventingchannels "knative.dev/eventing/pkg/channel" "knative.dev/eventing/pkg/channel/fanout" + klogtesting "knative.dev/pkg/logging/testing" _ "knative.dev/pkg/system/testing" ) @@ -444,63 +446,122 @@ func TestNewDispatcher(t *testing.T) { } func TestSetReady(t *testing.T) { + logger := klogtesting.TestLogger(t) testCases := []struct { name string ready bool subID types.UID + partition int32 originalKafkaSub *KafkaSubscription desiredKafkaSub *KafkaSubscription }{ { - name: "doesn't have the sub, add it (on ready)", - ready: true, - subID: "foo", + name: "doesn't have the sub, add it (on ready)", + ready: true, + subID: "foo", + partition: 0, originalKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{"bar": sets.NewInt32(0)}, }, desiredKafkaSub: &KafkaSubscription{ - subs: []types.UID{}, - channelReadySubscriptions: sets.String{"bar": sets.Empty{}, "foo": sets.Empty{}}, + subs: []types.UID{}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }, }, }, { - name: "has the sub already (on ready)", - ready: true, - subID: "foo", + name: "has the sub but not the partition, add it (on ready)", + ready: true, + subID: "foo", + partition: 1, originalKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }, }, desiredKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + subs: []types.UID{}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0, 1), + }, }, }, { - name: "has the sub, delete it (on !ready)", - ready: false, - subID: "foo", + name: "has the sub and partition already (on ready)", + ready: true, + subID: "foo", + partition: 0, originalKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"foo": sets.Empty{}, "bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }}, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }}, + }, + { + name: "has the sub with two partition, delete one (on !ready)", + ready: false, + subID: "foo", + partition: 1, + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0, 1), + }, }, desiredKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }, }, }, { - name: "doesn't have the sub to delete (on !ready)", - ready: false, - subID: "foo", + name: "has the sub with one partition, delete sub (on !ready)", + ready: false, + subID: "foo", + partition: 0, originalKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + "foo": sets.NewInt32(0), + }, }, desiredKafkaSub: &KafkaSubscription{ - channelReadySubscriptions: sets.String{"bar": sets.Empty{}}, + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + }, + }, + }, + { + name: "doesn't have the sub to delete (on !ready)", + ready: false, + subID: "foo", + partition: 0, + originalKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + }}, + desiredKafkaSub: &KafkaSubscription{ + channelReadySubscriptions: map[string]sets.Int32{ + "bar": sets.NewInt32(0), + }, }, }, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { t.Logf("Running %s", t.Name()) - tc.originalKafkaSub.SetReady(tc.subID, tc.ready) + tc.originalKafkaSub.logger = logger + tc.originalKafkaSub.SetReady(tc.subID, tc.partition, tc.ready) if diff := cmp.Diff(tc.desiredKafkaSub.channelReadySubscriptions, tc.originalKafkaSub.channelReadySubscriptions); diff != "" { t.Errorf("unexpected ChannelReadySubscription (-want, +got) = %v", diff) } @@ -536,11 +597,11 @@ func TestServeHTTP(t *testing.T) { name: "no ready subscribers", httpMethod: httpGet, responseReturnCode: http.StatusOK, - desiredJson: []byte(`{"bar/foo":[]}`), + desiredJson: []byte(`{}`), channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { subs: []types.UID{}, - channelReadySubscriptions: sets.String{}, + channelReadySubscriptions: map[string]sets.Int32{}, }, }, requestURI: "/bar/foo", @@ -551,36 +612,47 @@ func TestServeHTTP(t *testing.T) { responseReturnCode: http.StatusNotFound, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "baz"}: { - subs: []types.UID{"a", "b"}, - channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: map[string]sets.Int32{ + "a": sets.NewInt32(0), + "b": sets.NewInt32(0), + }, }, }, requestURI: "/bar/foo", }, { name: "return correct subscription", httpMethod: httpGet, - desiredJson: []byte(`{"bar/foo":["a","b"]}`), + desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), responseReturnCode: http.StatusOK, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "foo", Namespace: "bar"}: { - subs: []types.UID{"a", "b"}, - channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: map[string]sets.Int32{ + "a": sets.NewInt32(0), + "b": sets.NewInt32(0, 2, 5), + }, }, }, requestURI: "/bar/foo", }, { name: "return correct subscription from multiple chanrefs", httpMethod: httpGet, - desiredJson: []byte(`{"bar/foo":["a","b"]}`), + desiredJson: []byte(`{"a":[0],"b":[0,2,5]}`), responseReturnCode: http.StatusOK, channelSubs: map[eventingchannels.ChannelReference]*KafkaSubscription{ {Name: "table", Namespace: "flip"}: { - subs: []types.UID{"c", "d"}, - channelReadySubscriptions: sets.String{"c": sets.Empty{}}, - }, + subs: []types.UID{"c", "d"}, + channelReadySubscriptions: map[string]sets.Int32{ + "c": sets.NewInt32(0), + "d": sets.NewInt32(0), + }}, {Name: "foo", Namespace: "bar"}: { - subs: []types.UID{"a", "b"}, - channelReadySubscriptions: sets.String{"a": sets.Empty{}, "b": sets.Empty{}}, + subs: []types.UID{"a", "b"}, + channelReadySubscriptions: map[string]sets.Int32{ + "a": sets.NewInt32(0), + "b": sets.NewInt32(0, 2, 5), + }, }, }, requestURI: "/bar/foo", @@ -599,7 +671,7 @@ func TestServeHTTP(t *testing.T) { } d := &KafkaDispatcher{ channelSubscriptions: make(map[eventingchannels.ChannelReference]*KafkaSubscription), - logger: zaptest.NewLogger(t).Sugar(), + logger: klogtesting.TestLogger(t), } ts := httptest.NewServer(d) defer ts.Close() diff --git a/pkg/channel/consolidated/dispatcher/kafka_subscription.go b/pkg/channel/consolidated/dispatcher/kafka_subscription.go index ab240c8c9d..8f54c4a77c 100644 --- a/pkg/channel/consolidated/dispatcher/kafka_subscription.go +++ b/pkg/channel/consolidated/dispatcher/kafka_subscription.go @@ -19,29 +19,40 @@ package dispatcher import ( "sync" + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" ) type KafkaSubscription struct { - subs []types.UID - + logger *zap.SugaredLogger + subs []types.UID // readySubscriptionsLock must be used to synchronize access to channelReadySubscriptions readySubscriptionsLock sync.RWMutex - channelReadySubscriptions sets.String + channelReadySubscriptions map[string]sets.Int32 } // SetReady will mark the subid in the KafkaSubscription and call any registered callbacks -func (ks *KafkaSubscription) SetReady(subID types.UID, ready bool) { +func (ks *KafkaSubscription) SetReady(subID types.UID, partition int32, ready bool) { + ks.logger.Debugw("Setting subscription readiness", zap.Any("subscription", subID), zap.Bool("ready", ready)) ks.readySubscriptionsLock.Lock() defer ks.readySubscriptionsLock.Unlock() if ready { - if !ks.channelReadySubscriptions.Has(string(subID)) { - ks.channelReadySubscriptions.Insert(string(subID)) + if subs, ok := ks.channelReadySubscriptions[string(subID)]; ok { + ks.logger.Debugw("Adding ready ready partition to cached subscription", zap.Any("subscription", subID), zap.Int32("partition", partition)) + subs.Insert(partition) + } else { + ks.logger.Debugw("Caching ready subscription", zap.Any("subscription", subID), zap.Int32("partition", partition)) + ks.channelReadySubscriptions[string(subID)] = sets.NewInt32(partition) } } else { - if ks.channelReadySubscriptions.Has(string(subID)) { - ks.channelReadySubscriptions.Delete(string(subID)) + if subs, ok := ks.channelReadySubscriptions[string(subID)]; ok { + ks.logger.Debugw("Ejecting cached ready subscription", zap.Any("subscription", subID), zap.Int32("partition", partition)) + subs.Delete(partition) + if subs.Len() == 0 { + delete(ks.channelReadySubscriptions, string(subID)) + } } } } diff --git a/pkg/channel/consolidated/kafka/admin.go b/pkg/channel/consolidated/kafka/admin.go deleted file mode 100644 index 9bfb1ea140..0000000000 --- a/pkg/channel/consolidated/kafka/admin.go +++ /dev/null @@ -1,109 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "context" - "fmt" - "math" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/sets" - - "go.uber.org/zap" - - "github.com/Shopify/sarama" - "knative.dev/pkg/logging" -) - -var mutex sync.Mutex - -type ClusterAdminFactory func() (sarama.ClusterAdmin, error) - -type AdminClient interface { - // ListConsumerGroups Lists the consumer groups - ListConsumerGroups() ([]string, error) -} - -// AdminClientManager manages a ClusterAdmin connection and recreates one when needed -// it is made to overcome https://github.com/Shopify/sarama/issues/1162 -type AdminClientManager struct { - logger *zap.SugaredLogger - adminFactory ClusterAdminFactory - clusterAdmin sarama.ClusterAdmin -} - -func NewAdminClient(ctx context.Context, caFactory ClusterAdminFactory) (AdminClient, error) { - logger := logging.FromContext(ctx) - logger.Debug("Creating a new AdminClient") - kafkaClusterAdmin, err := caFactory() - if err != nil { - logger.Errorw("error while creating ClusterAdmin", zap.Error(err)) - return nil, err - } - return &AdminClientManager{ - logger: logger, - adminFactory: caFactory, - clusterAdmin: kafkaClusterAdmin, - }, nil -} - -// ListConsumerGroups Returns a list of the consumer groups. -// -// In the occasion of errors, there will be a retry with an exponential backoff. -// Due to a known issue in Sarama ClusterAdmin https://github.com/Shopify/sarama/issues/1162, -// a new ClusterAdmin will be created with every retry until the call succeeds or -// the timeout is reached. -func (c *AdminClientManager) ListConsumerGroups() ([]string, error) { - c.logger.Debug("Attempting to list consumer group") - mutex.Lock() - defer mutex.Unlock() - r := 0 - // This gives us around ~13min of exponential backoff - max := 13 - cgsMap, err := c.clusterAdmin.ListConsumerGroups() - for err != nil && r <= max { - // There's on error, let's retry and presume a new ClusterAdmin can fix it - - // Calculate incremental delay following this https://docs.aws.amazon.com/general/latest/gr/api-retries.html - t := int(math.Pow(2, float64(r)) * 100) - d := time.Duration(t) * time.Millisecond - c.logger.Errorw("listing consumer group failed. Refreshing the ClusterAdmin and retrying.", - zap.Error(err), - zap.Duration("retry after", d), - zap.Int("Retry attempt", r), - zap.Int("Max retries", max), - ) - time.Sleep(d) - - // let's reconnect and try again - c.clusterAdmin, err = c.adminFactory() - r += 1 - if err != nil { - // skip this attempt - continue - } - cgsMap, err = c.clusterAdmin.ListConsumerGroups() - } - - if r > max { - return nil, fmt.Errorf("failed to refresh the culster admin and retry: %v", err) - } - - return sets.StringKeySet(cgsMap).List(), nil -} diff --git a/pkg/channel/consolidated/kafka/admin_test.go b/pkg/channel/consolidated/kafka/admin_test.go deleted file mode 100644 index 3e671c22a2..0000000000 --- a/pkg/channel/consolidated/kafka/admin_test.go +++ /dev/null @@ -1,93 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package kafka - -import ( - "fmt" - "sync" - "testing" - "time" - - "github.com/Shopify/sarama" - pkgtesting "knative.dev/pkg/logging/testing" -) - -const testCG = "cg1" - -var m sync.RWMutex - -type FakeClusterAdmin struct { - sarama.ClusterAdmin - faulty bool -} - -func (f *FakeClusterAdmin) ListConsumerGroups() (map[string]string, error) { - cgs := map[string]string{ - testCG: "cg", - } - m.RLock() - defer m.RUnlock() - if f.faulty { - return nil, fmt.Errorf("Error") - } - return cgs, nil -} - -func TestAdminClient(t *testing.T) { - var wg sync.WaitGroup - wg.Add(10) - ctx := pkgtesting.TestContextWithLogger(t) - f := &FakeClusterAdmin{} - ac, err := NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) { - return f, nil - }) - if err != nil { - t.Error("failed to obtain new client", err) - } - for i := 0; i < 10; i += 1 { - go func() { - doList(t, ac) - check := make(chan struct{}) - go func() { - m.Lock() - f.faulty = true - m.Unlock() - check <- struct{}{} - time.Sleep(2 * time.Second) - m.Lock() - f.faulty = false - m.Unlock() - check <- struct{}{} - }() - <-check - doList(t, ac) - <-check - wg.Done() - }() - } - wg.Wait() -} - -func doList(t *testing.T, ac AdminClient) { - cgs, _ := ac.ListConsumerGroups() - if len(cgs) != 1 { - t.Fatalf("list consumer group: got %d, want %d", len(cgs), 1) - } - if cgs[0] != testCG { - t.Fatalf("consumer group: got %s, want %s", cgs[0], testCG) - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go b/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go deleted file mode 100644 index 353811c264..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher.go +++ /dev/null @@ -1,201 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "context" - "sync" - "time" - - "k8s.io/apimachinery/pkg/util/sets" - - "go.uber.org/zap" - "knative.dev/pkg/logging" - - "knative.dev/eventing-kafka/pkg/channel/consolidated/kafka" -) - -var ( - watchersMtx sync.RWMutex - cacheMtx sync.RWMutex - // Hooks into the poll logic for testing - after = time.After - done = func() {} -) - -type ConsumerGroupHandler func() -type Matcher func(string) bool - -type ConsumerGroupWatcher interface { - // Start instructs the watcher to start polling for the consumer groups and - // notify any observers on the event of any changes - Start() error - - // Terminate instructs the watcher to stop polling and clear the watchers cache - Terminate() - - // Watch registers callback on the event of any changes observed - // on the consumer groups. watcherID is an arbitrary string the user provides - // that will be used to identify his callbacks when provided to Forget(watcherID). - // - // To ensure this is event-triggered, level-driven, - // we don't pass the updates to the callback, instead the observer is expected - // to use List() to get the updated list of ConsumerGroups. - Watch(watcherID string, callback ConsumerGroupHandler) error - - // Forget removes all callbacks that correspond to the watcherID - Forget(watcherID string) - - // List returns all the cached consumer groups that match matcher. - // It will return an empty slice if none matched or the cache is empty - List(matcher Matcher) []string -} - -type WatcherImpl struct { - logger *zap.SugaredLogger - //TODO name? - watchers map[string]ConsumerGroupHandler - cachedConsumerGroups sets.String - adminClient kafka.AdminClient - pollDuration time.Duration - done chan struct{} -} - -func NewConsumerGroupWatcher(ctx context.Context, ac kafka.AdminClient, pollDuration time.Duration) ConsumerGroupWatcher { - return &WatcherImpl{ - logger: logging.FromContext(ctx), - adminClient: ac, - pollDuration: pollDuration, - watchers: make(map[string]ConsumerGroupHandler), - cachedConsumerGroups: sets.String{}, - } -} - -func (w *WatcherImpl) Start() error { - w.logger.Infow("ConsumerGroupWatcher starting. Polling for consumer groups", zap.Duration("poll duration", w.pollDuration)) - go func() { - for { - select { - case <-after(w.pollDuration): - // let's get current observed consumer groups - observedCGs, err := w.adminClient.ListConsumerGroups() - if err != nil { - w.logger.Errorw("error while listing consumer groups", zap.Error(err)) - continue - } - var notify bool - var changedGroup string - observedCGsSet := sets.String{}.Insert(observedCGs...) - // Look for observed CGs - for c := range observedCGsSet { - if !w.cachedConsumerGroups.Has(c) { - // This is the first appearance. - w.logger.Debugw("Consumer group observed. Caching.", - zap.String("consumer group", c)) - changedGroup = c - notify = true - break - } - } - // Look for disappeared CGs - for c := range w.cachedConsumerGroups { - if !observedCGsSet.Has(c) { - // This CG was cached but it's no longer there. - w.logger.Debugw("Consumer group deleted.", - zap.String("consumer group", c)) - changedGroup = c - notify = true - break - } - } - if notify { - cacheMtx.Lock() - w.cachedConsumerGroups = observedCGsSet - cacheMtx.Unlock() - w.notify(changedGroup) - } - done() - case <-w.done: - break - } - } - }() - return nil -} - -func (w *WatcherImpl) Terminate() { - watchersMtx.Lock() - cacheMtx.Lock() - defer watchersMtx.Unlock() - defer cacheMtx.Unlock() - - w.watchers = nil - w.cachedConsumerGroups = nil - if w.done != nil { - w.done <- struct{}{} - } -} - -// TODO explore returning a channel instead of a taking callback -func (w *WatcherImpl) Watch(watcherID string, cb ConsumerGroupHandler) error { - w.logger.Debugw("Adding a new watcher", zap.String("watcherID", watcherID)) - watchersMtx.Lock() - defer watchersMtx.Unlock() - w.watchers[watcherID] = cb - - // notify at least once to get the current state - cb() - return nil -} - -func (w *WatcherImpl) Forget(watcherID string) { - w.logger.Debugw("Forgetting watcher", zap.String("watcherID", watcherID)) - watchersMtx.Lock() - defer watchersMtx.Unlock() - delete(w.watchers, watcherID) -} - -func (w *WatcherImpl) List(matcher Matcher) []string { - w.logger.Debug("Listing consumer groups") - cacheMtx.RLock() - defer cacheMtx.RUnlock() - cgs := make([]string, 0) - for cg := range w.cachedConsumerGroups { - if matcher(cg) { - cgs = append(cgs, cg) - } - } - return cgs -} - -func (w *WatcherImpl) notify(cg string) { - watchersMtx.RLock() - defer watchersMtx.RUnlock() - - for _, cb := range w.watchers { - cb() - } -} - -func Find(list []string, item string) bool { - for _, i := range list { - if i == item { - return true - } - } - return false -} diff --git a/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher_test.go b/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher_test.go deleted file mode 100644 index e543c01ced..0000000000 --- a/pkg/channel/consolidated/reconciler/controller/consumer_group_watcher_test.go +++ /dev/null @@ -1,84 +0,0 @@ -/* -Copyright 2020 The Knative Authors - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package controller - -import ( - "sync" - "testing" - "time" - - "k8s.io/apimachinery/pkg/util/sets" - - pkgtesting "knative.dev/pkg/logging/testing" -) - -//TODO how to mock the sarama AdminClient -type FakeClusterAdmin struct { - mutex sync.RWMutex - cgs sets.String -} - -func (fake *FakeClusterAdmin) ListConsumerGroups() ([]string, error) { - fake.mutex.RLock() - defer fake.mutex.RUnlock() - return fake.cgs.List(), nil -} - -func (fake *FakeClusterAdmin) deleteCG(cg string) { - fake.mutex.Lock() - defer fake.mutex.Unlock() - fake.cgs.Delete(cg) -} - -func TestKafkaWatcher(t *testing.T) { - cgname := "kafka.event-example.default-kne-trigger.0d9c4383-1e68-42b5-8c3a-3788274404c5" - wid := "channel-abc" - cgs := sets.String{} - cgs.Insert(cgname) - ca := FakeClusterAdmin{ - cgs: cgs, - } - - ch := make(chan sets.String, 1) - - w := NewConsumerGroupWatcher(pkgtesting.TestContextWithLogger(t), &ca, 2*time.Second) - w.Watch(wid, func() { - cgs := w.List(func(cg string) bool { - return cgname == cg - }) - result := sets.String{} - result.Insert(cgs...) - ch <- result - }) - - w.Start() - <-ch - assertSync(t, ch, cgs) - ca.deleteCG(cgname) - assertSync(t, ch, sets.String{}) -} - -func assertSync(t *testing.T, ch chan sets.String, cgs sets.String) { - select { - case syncedCGs := <-ch: - if !syncedCGs.Equal(cgs) { - t.Errorf("observed and expected consumer groups do not match. got %v expected %v", syncedCGs, cgs) - } - case <-time.After(6 * time.Second): - t.Errorf("timedout waiting for consumer groups to sync") - } -} diff --git a/pkg/channel/consolidated/reconciler/controller/controller.go b/pkg/channel/consolidated/reconciler/controller/controller.go index 309b6f1779..32b10cb911 100644 --- a/pkg/channel/consolidated/reconciler/controller/controller.go +++ b/pkg/channel/consolidated/reconciler/controller/controller.go @@ -19,27 +19,40 @@ package controller import ( "context" + corev1 "k8s.io/api/core/v1" + knativeReconciler "knative.dev/pkg/reconciler" + "github.com/kelseyhightower/envconfig" "go.uber.org/zap" - commonconfig "knative.dev/eventing-kafka/pkg/common/config" - + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/cache" + + "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka/pkg/channel/consolidated/status" + kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client" + "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" + kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + commonconfig "knative.dev/eventing-kafka/pkg/common/config" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + eventingClient "knative.dev/eventing/pkg/client/injection/client" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/client/injection/kube/informers/apps/v1/deployment" - "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" + endpointsinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints" + podinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/pod" "knative.dev/pkg/client/injection/kube/informers/core/v1/service" "knative.dev/pkg/client/injection/kube/informers/core/v1/serviceaccount" "knative.dev/pkg/client/injection/kube/informers/rbac/v1/rolebinding" - "knative.dev/pkg/configmap" "knative.dev/pkg/controller" "knative.dev/pkg/logging" "knative.dev/pkg/system" +) - kafkaChannelClient "knative.dev/eventing-kafka/pkg/client/injection/client" - "knative.dev/eventing-kafka/pkg/client/injection/informers/messaging/v1beta1/kafkachannel" - kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" - eventingClient "knative.dev/eventing/pkg/client/injection/client" +const ( + channelLabelKey = "messaging.knative.dev/channel" + channelLabelValue = "kafka-channel" + roleLabelKey = "messaging.knative.dev/role" + roleLabelValue = "dispatcher" ) // NewController initializes the controller and is called by the generated code. @@ -48,13 +61,14 @@ func NewController( ctx context.Context, cmw configmap.Watcher, ) *controller.Impl { - + logger := logging.FromContext(ctx) kafkaChannelInformer := kafkachannel.Get(ctx) deploymentInformer := deployment.Get(ctx) - endpointsInformer := endpoints.Get(ctx) + endpointsInformer := endpointsinformer.Get(ctx) serviceAccountInformer := serviceaccount.Get(ctx) roleBindingInformer := rolebinding.Get(ctx) serviceInformer := service.Get(ctx) + podInformer := podinformer.Get(ctx) r := &Reconciler{ systemNamespace: system.Namespace(), @@ -70,8 +84,6 @@ func NewController( roleBindingLister: roleBindingInformer.Lister(), } - logger := logging.FromContext(ctx) - env := &envConfig{} if err := envconfig.Process("", env); err != nil { logger.Panicf("unable to process Kafka channel's required environment variables: %v", err) @@ -85,6 +97,16 @@ func NewController( impl := kafkaChannelReconciler.NewImpl(ctx, r) + statusProber := status.NewProber( + logger.Named("status-manager"), + NewProbeTargetLister(logger, endpointsInformer.Lister()), + func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { + logger.Debugf("Ready callback triggered for channel: %s/%s subscription: %s", c.Namespace, c.Name, string(s.UID)) + impl.EnqueueKey(types.NamespacedName{Namespace: c.Namespace, Name: c.Name}) + }, + ) + r.statusManager = statusProber + statusProber.Start(ctx.Done()) // Get and Watch the Kakfa config map and dynamically update Kafka configuration. err := commonconfig.InitializeKafkaConfigMapWatcher(ctx, cmw, logger, r.updateKafkaConfig, system.Namespace()) if err != nil { @@ -125,5 +147,24 @@ func NewController( Handler: controller.HandleAll(grCh), }) + podInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ + FilterFunc: knativeReconciler.ChainFilterFuncs( + knativeReconciler.LabelFilterFunc(channelLabelKey, channelLabelValue, false), + knativeReconciler.LabelFilterFunc(roleLabelKey, roleLabelValue, false), + ), + Handler: cache.ResourceEventHandlerFuncs{ + // Cancel probing when a Pod is deleted + DeleteFunc: func(obj interface{}) { + pod, ok := obj.(*corev1.Pod) + if ok && pod != nil { + logger.Debugw("Dispatcher pod deleted. Canceling pod probing.", + zap.String("pod", pod.GetName())) + statusProber.CancelPodProbing(*pod) + impl.GlobalResync(kafkaChannelInformer.Informer()) + } + }, + }, + }) + return impl } diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go index a48c6d1bb2..8a2fe567f7 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel.go @@ -19,12 +19,6 @@ package controller import ( "context" "fmt" - "strings" - "time" - - "knative.dev/eventing-kafka/pkg/channel/consolidated/kafka" - "knative.dev/eventing-kafka/pkg/common/client" - "knative.dev/eventing-kafka/pkg/common/constants" "github.com/Shopify/sarama" "go.uber.org/zap" @@ -45,11 +39,14 @@ import ( "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources" + "knative.dev/eventing-kafka/pkg/channel/consolidated/status" "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" kafkaclientset "knative.dev/eventing-kafka/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-kafka/pkg/client/clientset/versioned/scheme" kafkaChannelReconciler "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" listers "knative.dev/eventing-kafka/pkg/client/listers/messaging/v1beta1" + "knative.dev/eventing-kafka/pkg/common/client" + "knative.dev/eventing-kafka/pkg/common/constants" v1 "knative.dev/eventing/pkg/apis/duck/v1" "knative.dev/eventing/pkg/apis/eventing" eventingclientset "knative.dev/eventing/pkg/client/clientset/versioned" @@ -76,8 +73,11 @@ const ( dispatcherRoleBindingCreated = "DispatcherRoleBindingCreated" dispatcherName = "kafka-ch-dispatcher" +) - pollInterval = 2 * time.Second +var ( + scopeNamespace = "namespace" + scopeCluster = "cluster" ) func newReconciledNormal(namespace, name string) pkgreconciler.Event { @@ -122,7 +122,6 @@ type Reconciler struct { // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. kafkaClusterAdmin sarama.ClusterAdmin - consumerGroupWatcher ConsumerGroupWatcher kafkachannelLister listers.KafkaChannelLister kafkachannelInformer cache.SharedIndexInformer deploymentLister appsv1listers.DeploymentLister @@ -130,13 +129,9 @@ type Reconciler struct { endpointsLister corev1listers.EndpointsLister serviceAccountLister corev1listers.ServiceAccountLister roleBindingLister rbacv1listers.RoleBindingLister + statusManager status.Manager } -var ( - scopeNamespace = "namespace" - scopeCluster = "cluster" -) - type envConfig struct { Image string `envconfig:"DISPATCHER_IMAGE" required:"true"` } @@ -147,7 +142,6 @@ var _ kafkaChannelReconciler.Finalizer = (*Reconciler)(nil) func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { kc.Status.InitializeConditions() - logger := logging.FromContext(ctx) // Verify channel is valid. kc.SetDefaults(ctx) @@ -155,7 +149,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel logger.Errorw("Invalid kafka channel", zap.String("channel", kc.Name), zap.Error(err)) return err } - if r.kafkaConfig == nil { if r.kafkaConfigError == nil { r.kafkaConfigError = fmt.Errorf("the config map '%s' does not exist", constants.SettingsConfigMapName) @@ -256,29 +249,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1beta1.KafkaChannel return newReconciledNormal(kc.Namespace, kc.Name) } -func (r *Reconciler) setupSubscriptionStatusWatcher(ctx context.Context, channel *v1beta1.KafkaChannel) error { - var err error - groupIDPrefix := fmt.Sprintf("kafka.%s.%s", channel.Namespace, channel.Name) - - m := func(cg string) bool { - return strings.HasPrefix(cg, groupIDPrefix) - } - err = r.consumerGroupWatcher.Watch(string(channel.ObjectMeta.UID), func() { - err := r.markSubscriptionReadiness(ctx, channel, r.consumerGroupWatcher.List(m)) - if err != nil { - logging.FromContext(ctx).Errorw("error updating subscription readiness", zap.Error(err)) - } - }) - return err -} - -func (r *Reconciler) markSubscriptionReadiness(ctx context.Context, ch *v1beta1.KafkaChannel, cgs []string) error { +func (r *Reconciler) setupSubscriptionStatusWatcher(ctx context.Context, ch *v1beta1.KafkaChannel) error { after := ch.DeepCopy() after.Status.Subscribers = make([]v1.SubscriberStatus, 0) for _, s := range ch.Spec.Subscribers { - cg := fmt.Sprintf("kafka.%s.%s.%s", ch.Namespace, ch.Name, s.UID) - if Find(cgs, cg) { + if r, _ := r.statusManager.IsReady(ctx, *ch, s); r { logging.FromContext(ctx).Debugw("marking subscription", zap.Any("subscription", s)) after.Status.Subscribers = append(after.Status.Subscribers, v1.SubscriberStatus{ UID: s.UID, @@ -565,6 +541,7 @@ func (r *Reconciler) deleteTopic(ctx context.Context, channel *v1beta1.KafkaChan logger.Infow("Deleting topic on Kafka Cluster", zap.String("topic", topicName)) err := kafkaClusterAdmin.DeleteTopic(topicName) if err == sarama.ErrUnknownTopicOrPartition { + logger.Debugw("Received an unknown topic or partition response. Ignoring") return nil } else if err != nil { logger.Errorw("Error deleting topic", zap.String("topic", topicName), zap.Error(err)) @@ -590,36 +567,32 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co // Eventually the previous config should be snapshotted to delete Kafka topics r.kafkaConfig = kafkaConfig r.kafkaConfigError = err - ac, err := kafka.NewAdminClient(ctx, func() (sarama.ClusterAdmin, error) { - return client.MakeAdminClient(ctx, controllerAgentName, r.kafkaAuthConfig, kafkaConfig.SaramaSettingsYamlString, kafkaConfig.Brokers) - }) if err != nil { logger.Errorw("Error creating AdminClient", zap.Error(err)) return } - - if r.consumerGroupWatcher != nil { - logger.Info("terminating consumer group watcher") - r.consumerGroupWatcher.Terminate() - logger.Info("terminated consumer group watcher") - } - - r.consumerGroupWatcher = NewConsumerGroupWatcher(ctx, ac, pollInterval) - //TODO handle error - r.consumerGroupWatcher.Start() } func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1beta1.KafkaChannel) pkgreconciler.Event { // Do not attempt retrying creating the client because it might be a permanent error // in which case the finalizer will never get removed. - if kafkaClusterAdmin, err := r.createClient(ctx); err == nil && r.kafkaConfig != nil { + logger := logging.FromContext(ctx) + channel := fmt.Sprintf("%s/%s", kc.GetNamespace(), kc.GetName()) + logger.Debugw("FinalizeKind", zap.String("channel", channel)) + kafkaClusterAdmin, err := r.createClient(ctx) + if err != nil || r.kafkaConfig == nil { + logger.Errorw("Can't obtain Kafka Client", zap.String("channel", channel), zap.Error(err)) + } else { + logger.Debugw("Got client, about to delete topic") if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil { + logger.Errorw("Error deleting Kafka channel topic", zap.String("channel", channel), zap.Error(err)) return err } } - if r.consumerGroupWatcher != nil { - r.consumerGroupWatcher.Forget(string(kc.ObjectMeta.UID)) + for _, s := range kc.Spec.Subscribers { + logger.Debugw("Canceling probing", zap.String("channel", channel), zap.Any("subscription", s)) + r.statusManager.CancelProbing(s) } return newReconciledNormal(kc.Namespace, kc.Name) //ok to remove finalizer } diff --git a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go index 0bccf4490d..4d4454c1e3 100644 --- a/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go +++ b/pkg/channel/consolidated/reconciler/controller/kafkachannel_test.go @@ -20,15 +20,9 @@ import ( "context" "fmt" "testing" - "time" - - eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" - "knative.dev/pkg/apis" "github.com/Shopify/sarama" - "go.uber.org/zap" - appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,8 +30,15 @@ import ( "k8s.io/client-go/kubernetes/scheme" clientgotesting "k8s.io/client-go/testing" + "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources" + reconcilertesting "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/testing" + . "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" + fakekafkaclient "knative.dev/eventing-kafka/pkg/client/injection/client/fake" + "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" eventingClient "knative.dev/eventing/pkg/client/injection/client" - + "knative.dev/pkg/apis" duckv1 "knative.dev/pkg/apis/duck/v1" kubeclient "knative.dev/pkg/client/injection/kube/client" "knative.dev/pkg/configmap" @@ -46,13 +47,6 @@ import ( "knative.dev/pkg/logging" "knative.dev/pkg/network" . "knative.dev/pkg/reconciler/testing" - - "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" - "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/controller/resources" - reconcilertesting "knative.dev/eventing-kafka/pkg/channel/consolidated/reconciler/testing" - . "knative.dev/eventing-kafka/pkg/channel/consolidated/utils" - fakekafkaclient "knative.dev/eventing-kafka/pkg/client/injection/client/fake" - "knative.dev/eventing-kafka/pkg/client/injection/reconciler/messaging/v1beta1/kafkachannel" ) const ( @@ -64,6 +58,7 @@ const ( finalizerName = "kafkachannels.messaging.knative.dev" sub1UID = "2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1" sub2UID = "34c5aec8-deb6-11e8-9f32-f2801f1b9fd1" + twoSubscribersPatch = `[{"op":"add","path":"/status/subscribers","value":[{"observedGeneration":1,"ready":"True","uid":"2f9b5e8e-deb6-11e8-9f32-f2801f1b9fd1"},{"observedGeneration":2,"ready":"True","uid":"34c5aec8-deb6-11e8-9f32-f2801f1b9fd1"}]}]` ) var ( @@ -263,6 +258,9 @@ func TestAllCases(t *testing.T) { WantEvents: []string{ Eventf(corev1.EventTypeNormal, "KafkaChannelReconciled", `KafkaChannel reconciled: "test-namespace/test-kc"`), }, + WantPatches: []clientgotesting.PatchActionImpl{ + makePatch(testNS, kcName, twoSubscribersPatch), + }, }, { Name: "channel exists, not owned by us", Key: kcKey, @@ -334,8 +332,7 @@ func TestAllCases(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), - kafkachannelLister: listers.GetKafkaChannelLister(), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -345,6 +342,12 @@ func TestAllCases(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), + statusManager: &fakeStatusManager{ + FakeIsReady: func(ctx context.Context, ch v1beta1.KafkaChannel, + sub eventingduckv1.SubscriberSpec) (bool, error) { + return true, nil + }, + }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -392,8 +395,7 @@ func TestTopicExists(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), - kafkachannelLister: listers.GetKafkaChannelLister(), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -411,6 +413,12 @@ func TestTopicExists(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), + statusManager: &fakeStatusManager{ + FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, + spec eventingduckv1.SubscriberSpec) (bool, error) { + return true, nil + }, + }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -462,8 +470,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), - kafkachannelLister: listers.GetKafkaChannelLister(), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -481,6 +488,12 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), + statusManager: &fakeStatusManager{ + FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, + spec eventingduckv1.SubscriberSpec) (bool, error) { + return true, nil + }, + }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -532,8 +545,7 @@ func TestDeploymentZeroReplicas(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), - kafkachannelLister: listers.GetKafkaChannelLister(), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -551,6 +563,12 @@ func TestDeploymentZeroReplicas(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), + statusManager: &fakeStatusManager{ + FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, + spec eventingduckv1.SubscriberSpec) (bool, error) { + return true, nil + }, + }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -599,8 +617,7 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) { kafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, - consumerGroupWatcher: NewConsumerGroupWatcher(ctx, &FakeClusterAdmin{}, 100*time.Millisecond), - kafkachannelLister: listers.GetKafkaChannelLister(), + kafkachannelLister: listers.GetKafkaChannelLister(), // TODO fix kafkachannelInformer: nil, deploymentLister: listers.GetDeploymentLister(), @@ -618,6 +635,12 @@ func TestDeploymentMoreThanOneReplicas(t *testing.T) { kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), + statusManager: &fakeStatusManager{ + FakeIsReady: func(ctx context.Context, channel v1beta1.KafkaChannel, + spec eventingduckv1.SubscriberSpec) (bool, error) { + return true, nil + }, + }, } return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) @@ -828,3 +851,29 @@ func subscribers() []eventingduckv1.SubscriberSpec { ReplyURI: apis.HTTP("sink2"), }} } + +type fakeStatusManager struct { + FakeIsReady func(context.Context, v1beta1.KafkaChannel, eventingduckv1.SubscriberSpec) (bool, error) +} + +func (m *fakeStatusManager) IsReady(ctx context.Context, ch v1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) { + return m.FakeIsReady(ctx, ch, sub) +} + +func (m *fakeStatusManager) CancelProbing(sub eventingduckv1.SubscriberSpec) { + //do nothing +} + +func (m *fakeStatusManager) CancelPodProbing(pod corev1.Pod) { + //do nothing +} + +func makePatch(namespace, name, patch string) clientgotesting.PatchActionImpl { + return clientgotesting.PatchActionImpl{ + ActionImpl: clientgotesting.ActionImpl{ + Namespace: namespace, + }, + Name: name, + Patch: []byte(patch), + } +} diff --git a/pkg/channel/consolidated/reconciler/controller/lister.go b/pkg/channel/consolidated/reconciler/controller/lister.go new file mode 100644 index 0000000000..db39a2b251 --- /dev/null +++ b/pkg/channel/consolidated/reconciler/controller/lister.go @@ -0,0 +1,84 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controller + +import ( + "context" + "fmt" + "net/url" + + "knative.dev/eventing-kafka/pkg/channel/consolidated/status" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" + v1 "k8s.io/client-go/listers/core/v1" + "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + "knative.dev/eventing/pkg/apis/eventing" + "knative.dev/pkg/system" +) + +type DispatcherPodsLister struct { + logger *zap.SugaredLogger + endpointLister v1.EndpointsLister +} + +func (t *DispatcherPodsLister) ListProbeTargets(ctx context.Context, kc v1beta1.KafkaChannel) (*status.ProbeTarget, error) { + scope, ok := kc.Annotations[eventing.ScopeAnnotationKey] + if !ok { + scope = scopeCluster + } + + dispatcherNamespace := system.Namespace() + if scope == scopeNamespace { + dispatcherNamespace = kc.Namespace + } + + // Get the Dispatcher Service Endpoints and propagate the status to the Channel + // endpoints has the same name as the service, so not a bug. + eps, err := t.endpointLister.Endpoints(dispatcherNamespace).Get(dispatcherName) + if err != nil { + return nil, fmt.Errorf("failed to get internal service: %w", err) + } + var readyIPs []string + + for _, sub := range eps.Subsets { + for _, address := range sub.Addresses { + readyIPs = append(readyIPs, address.IP) + } + } + + if len(readyIPs) == 0 { + return nil, fmt.Errorf("no gateway pods available") + } + + u, _ := url.Parse(fmt.Sprintf("http://%s.%s/%s/%s", dispatcherName, dispatcherNamespace, kc.Namespace, kc.Name)) + + return &status.ProbeTarget{ + PodIPs: sets.NewString(readyIPs...), + PodPort: "8081", + Port: "8081", + URL: u, + }, nil +} + +func NewProbeTargetLister(logger *zap.SugaredLogger, lister v1.EndpointsLister) status.ProbeTargetLister { + tl := DispatcherPodsLister{ + logger: logger, + endpointLister: lister, + } + return &tl +} diff --git a/pkg/channel/consolidated/status/status.go b/pkg/channel/consolidated/status/status.go new file mode 100644 index 0000000000..b42f7bf0f1 --- /dev/null +++ b/pkg/channel/consolidated/status/status.go @@ -0,0 +1,475 @@ +/* +Copyright 2019 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "encoding/json" + "fmt" + "net" + "net/http" + "net/url" + "reflect" + "sync" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap" + "golang.org/x/time/rate" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/client-go/util/workqueue" + + messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/networking/pkg/prober" + "knative.dev/pkg/logging" +) + +const ( + // probeConcurrency defines how many probing calls can be issued simultaneously + probeConcurrency = 100 + // probeTimeout defines the maximum amount of time a request will wait + probeTimeout = 1 * time.Second + // initialDelay defines the delay before enqueuing a probing request the first time. + // It gives times for the change to propagate and prevents unnecessary retries. + initialDelay = 200 * time.Millisecond +) + +var dialContext = (&net.Dialer{Timeout: probeTimeout}).DialContext + +// targetState represents the probing state of a subscription +type targetState struct { + sub eventingduckv1.SubscriberSpec + ch messagingv1beta1.KafkaChannel + + readyLock sync.RWMutex + // pendingCount is the number of pods that haven't been successfully probed yet + pendingCount atomic.Int32 + // readyCount is the number of pods that have the subscription ready + readyPartitions sets.Int + initialCount int + lastAccessed time.Time + + cancel func() +} + +// podState represents the probing state of a Pod (for a specific subscription) +type podState struct { + // pendingCount is the number of probes for the Pod + pendingCount atomic.Int32 + + cancel func() +} + +// cancelContext is a pair of a Context and its cancel function +type cancelContext struct { + context context.Context + cancel func() +} + +type workItem struct { + targetStates *targetState + podState *podState + context context.Context + url *url.URL + podIP string + podPort string + logger *zap.SugaredLogger +} + +// ProbeTarget contains the URLs to probes for a set of Pod IPs serving out of the same port. +type ProbeTarget struct { + PodIPs sets.String + PodPort string + Port string + URL *url.URL +} + +// ProbeTargetLister lists all the targets that requires probing. +type ProbeTargetLister interface { + // ListProbeTargets returns a list of targets to be probed + ListProbeTargets(ctx context.Context, ch messagingv1beta1.KafkaChannel) (*ProbeTarget, error) +} + +// Manager provides a way to check if an Ingress is ready +type Manager interface { + IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) + CancelProbing(sub eventingduckv1.SubscriberSpec) + CancelPodProbing(pod corev1.Pod) +} + +// Prober provides a way to check if a VirtualService is ready by probing the Envoy pods +// handling that VirtualService. +type Prober struct { + logger *zap.SugaredLogger + + // mu guards targetStates and podContexts + mu sync.Mutex + targetStates map[types.UID]*targetState + podContexts map[string]cancelContext + + workQueue workqueue.RateLimitingInterface + + targetLister ProbeTargetLister + + readyCallback func(messagingv1beta1.KafkaChannel, eventingduckv1.SubscriberSpec) + + probeConcurrency int + + opts []interface{} +} + +// NewProber creates a new instance of Prober +func NewProber( + logger *zap.SugaredLogger, + targetLister ProbeTargetLister, + readyCallback func(messagingv1beta1.KafkaChannel, eventingduckv1.SubscriberSpec), opts ...interface{}) *Prober { + return &Prober{ + logger: logger, + targetStates: make(map[types.UID]*targetState), + podContexts: make(map[string]cancelContext), + workQueue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewMaxOfRateLimiter( + // Per item exponential backoff + workqueue.NewItemExponentialFailureRateLimiter(50*time.Millisecond, 30*time.Second), + // Global rate limiter + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(50), 100)}, + ), + "ProbingQueue"), + targetLister: targetLister, + readyCallback: readyCallback, + probeConcurrency: probeConcurrency, + opts: opts, + } +} + +func (m *Prober) checkReadiness(state *targetState) bool { + consumers := int32(state.initialCount) + partitions := state.ch.Spec.NumPartitions + m.logger.Debugw("Checking subscription readiness", + zap.Any("initial probed consumers", consumers), + zap.Any("channel partitions", partitions), + zap.Any("ready partitions", state.readyPartitions.List()), + ) + return state.readyPartitions.Len() == int(partitions) +} + +func (m *Prober) IsReady(ctx context.Context, ch messagingv1beta1.KafkaChannel, sub eventingduckv1.SubscriberSpec) (bool, error) { + subscriptionKey := sub.UID + logger := logging.FromContext(ctx) + + if ready, ok := func() (bool, bool) { + m.mu.Lock() + defer m.mu.Unlock() + if state, ok := m.targetStates[subscriptionKey]; ok { + if state.sub.Generation == sub.Generation { + state.lastAccessed = time.Now() + logger.Debugw("Subscription is cached. Checking readiness", + zap.Any("subscription", sub.UID)) + return m.checkReadiness(state), true + } + + // Cancel the polling for the outdated version + state.cancel() + delete(m.targetStates, subscriptionKey) + } + return false, false + }(); ok { + return ready, nil + } + + subCtx, cancel := context.WithCancel(context.Background()) + subscriptionState := &targetState{ + sub: sub, + ch: ch, + lastAccessed: time.Now(), + cancel: cancel, + } + + // Get the probe targets and group them by IP + target, err := m.targetLister.ListProbeTargets(ctx, ch) + if err != nil { + logger.Errorw("Error listing probe targets", zap.Error(err), + zap.Any("subscription", sub.UID)) + return false, err + } + + workItems := make(map[string][]*workItem) + for ip := range target.PodIPs { + workItems[ip] = append(workItems[ip], &workItem{ + targetStates: subscriptionState, + url: target.URL, + podIP: ip, + podPort: target.PodPort, + logger: logger, + }) + } + + subscriptionState.initialCount = target.PodIPs.Len() + subscriptionState.pendingCount.Store(int32(len(workItems))) + subscriptionState.readyPartitions = sets.Int{} + + for ip, ipWorkItems := range workItems { + // Get or create the context for that IP + ipCtx := func() context.Context { + m.mu.Lock() + defer m.mu.Unlock() + cancelCtx, ok := m.podContexts[ip] + if !ok { + ctx, cancel := context.WithCancel(context.Background()) + cancelCtx = cancelContext{ + context: ctx, + cancel: cancel, + } + m.podContexts[ip] = cancelCtx + } + return cancelCtx.context + }() + + podCtx, cancel := context.WithCancel(subCtx) + podState := &podState{ + pendingCount: *atomic.NewInt32(int32(len(ipWorkItems))), + cancel: cancel, + } + + // Quick and dirty way to join two contexts (i.e. podCtx is cancelled when either subCtx or ipCtx are cancelled) + go func() { + select { + case <-podCtx.Done(): + // This is the actual context, there is nothing to do except + // break to avoid leaking this goroutine. + break + case <-ipCtx.Done(): + // Cancel podCtx + cancel() + } + }() + + // Update the states when probing is cancelled + go func() { + <-podCtx.Done() + m.onProbingCancellation(subscriptionState, podState) + }() + + for _, wi := range ipWorkItems { + wi.podState = podState + wi.context = podCtx + m.workQueue.AddAfter(wi, initialDelay) + logger.Infof("Queuing probe for %s, IP: %s:%s (depth: %d)", + wi.url, wi.podIP, wi.podPort, m.workQueue.Len()) + } + } + + func() { + m.mu.Lock() + defer m.mu.Unlock() + m.targetStates[subscriptionKey] = subscriptionState + }() + return false, nil +} + +// Start starts the Manager background operations +func (m *Prober) Start(done <-chan struct{}) chan struct{} { + var wg sync.WaitGroup + + // Start the worker goroutines + for i := 0; i < m.probeConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for m.processWorkItem() { + } + }() + } + + // Stop processing the queue when cancelled + go func() { + <-done + m.workQueue.ShutDown() + }() + + // Return a channel closed when all work is done + ch := make(chan struct{}) + go func() { + wg.Wait() + close(ch) + }() + return ch +} + +// CancelProbing cancels probing of the provided Subscription +func (m *Prober) CancelProbing(sub eventingduckv1.SubscriberSpec) { + m.mu.Lock() + defer m.mu.Unlock() + if state, ok := m.targetStates[sub.UID]; ok { + m.logger.Debugw("Canceling state", zap.Any("subscription", sub)) + state.cancel() + delete(m.targetStates, sub.UID) + } +} + +// CancelPodProbing cancels probing of the provided Pod IP. +func (m *Prober) CancelPodProbing(pod corev1.Pod) { + m.mu.Lock() + defer m.mu.Unlock() + + if ctx, ok := m.podContexts[pod.Status.PodIP]; ok { + ctx.cancel() + delete(m.podContexts, pod.Status.PodIP) + } +} + +// processWorkItem processes a single work item from workQueue. +// It returns false when there is no more items to process, true otherwise. +func (m *Prober) processWorkItem() bool { + obj, shutdown := m.workQueue.Get() + if shutdown { + return false + } + + defer m.workQueue.Done(obj) + + // Crash if the item is not of the expected type + item, ok := obj.(*workItem) + if !ok { + m.logger.Fatalf("Unexpected work item type: want: %s, got: %s\n", + reflect.TypeOf(&workItem{}).Name(), reflect.TypeOf(obj).Name()) + } + item.logger.Infof("Processing probe for %s, IP: %s:%s (depth: %d)", + item.url, item.podIP, item.podPort, m.workQueue.Len()) + + transport := http.DefaultTransport.(*http.Transport).Clone() + + transport.DialContext = func(ctx context.Context, network, addr string) (conn net.Conn, e error) { + // http.Request.URL is set to the hostname and it is substituted in here with the target IP. + return dialContext(ctx, network, net.JoinHostPort(item.podIP, item.podPort)) + } + + probeURL := deepCopy(item.url) + + ctx, cancel := context.WithTimeout(item.context, probeTimeout) + defer cancel() + var opts []interface{} + opts = append(opts, m.opts...) + opts = append(opts, m.probeVerifier(item)) + + ok, err := prober.Do( + ctx, + transport, + probeURL.String(), + opts...) + + // In case of cancellation, drop the work item + select { + case <-item.context.Done(): + m.workQueue.Forget(obj) + return true + default: + } + + if err != nil || !ok { + // In case of error, enqueue for retry + m.workQueue.AddRateLimited(obj) + item.logger.Debugw("Probing of %s failed, IP: %s:%s, ready: %t, error: %v (depth: %d)", + item.url, item.podIP, item.podPort, ok, err, m.workQueue.Len()) + } else { + m.onProbingSuccess(item.targetStates, item.podState) + } + return true +} + +func (m *Prober) onProbingSuccess(subscriptionState *targetState, podState *podState) { + // The last probe call for the Pod succeeded, the Pod is ready + if podState.pendingCount.Dec() == 0 { + // Unlock the goroutine blocked on <-podCtx.Done() + podState.cancel() + // This is the last pod being successfully probed, the subscription is ready + if m.checkReadiness(subscriptionState) { + subscriptionState.cancel() + m.readyCallback(subscriptionState.ch, subscriptionState.sub) + } + } +} + +func (m *Prober) onProbingCancellation(subscriptionState *targetState, podState *podState) { + for { + pendingCount := podState.pendingCount.Load() + if pendingCount <= 0 { + // Probing succeeded, nothing to do + return + } + + // Attempt to set pendingCount to 0. + if podState.pendingCount.CAS(pendingCount, 0) { + // This is the last pod being successfully probed, the subscription is ready + if subscriptionState.pendingCount.Dec() == 0 { + subscriptionState.cancel() + m.readyCallback(subscriptionState.ch, subscriptionState.sub) + } + return + } + } +} + +func (m *Prober) probeVerifier(item *workItem) prober.Verifier { + return func(r *http.Response, b []byte) (bool, error) { + m.logger.Debugw("Verifying response", zap.Int("status code", r.StatusCode), + zap.ByteString("body", b)) + switch r.StatusCode { + case http.StatusOK: + var subscriptions = make(map[string][]int) + err := json.Unmarshal(b, &subscriptions) + if err != nil { + m.logger.Errorw("error unmarshaling", err) + return false, err + } + uid := string(item.targetStates.sub.UID) + key := fmt.Sprintf("%s/%s", item.targetStates.ch.Namespace, item.targetStates.ch.Name) + m.logger.Debugw("Received proper probing response from target", + zap.Any("found subscriptions", subscriptions), + zap.String("pod ip", item.podIP), + zap.String("want channel", key), + zap.String("want subscription", uid), + ) + if partitions, ok := subscriptions[uid]; ok { + item.targetStates.readyLock.Lock() + defer item.targetStates.readyLock.Unlock() + item.targetStates.readyPartitions.Insert(partitions...) + return true, nil + } else { + return false, nil + } + case http.StatusNotFound, http.StatusServiceUnavailable: + m.logger.Errorf("unexpected status code: want %v, got %v", http.StatusOK, r.StatusCode) + return false, fmt.Errorf("unexpected status code: want %v, got %v", http.StatusOK, r.StatusCode) + default: + item.logger.Errorf("Probing of %s abandoned, IP: %s:%s: the response status is %v, expected one of: %v", + item.url, item.podIP, item.podPort, r.StatusCode, + []int{http.StatusOK, http.StatusNotFound, http.StatusServiceUnavailable}) + return true, nil + } + } +} + +// deepCopy copies a URL into a new one +func deepCopy(in *url.URL) *url.URL { + // Safe to ignore the error since this is a deep copy + newURL, _ := url.Parse(in.String()) + return newURL +} diff --git a/pkg/channel/consolidated/status/status_test.go b/pkg/channel/consolidated/status/status_test.go new file mode 100644 index 0000000000..c9d0efe60a --- /dev/null +++ b/pkg/channel/consolidated/status/status_test.go @@ -0,0 +1,218 @@ +/* +Copyright 2019 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package status + +import ( + "context" + "encoding/json" + "errors" + "net/http" + "net/http/httptest" + "net/url" + "strconv" + "testing" + "time" + + "go.uber.org/atomic" + "go.uber.org/zap/zaptest" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" + + "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + messagingv1beta1 "knative.dev/eventing-kafka/pkg/apis/messaging/v1beta1" + eventingduckv1 "knative.dev/eventing/pkg/apis/duck/v1" + "knative.dev/pkg/apis" +) + +var ( + channelTemplate = &v1beta1.KafkaChannel{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "default", + Name: "chan4prober", + }, + Spec: v1beta1.KafkaChannelSpec{ + NumPartitions: 1, + ReplicationFactor: 1, + }, + } + subscriptionTemplate = eventingduckv1.SubscriberSpec{ + UID: types.UID("90713ffd-f527-42bf-b158-57630b68ebe2"), + Generation: 1, + SubscriberURI: getURL("http://subscr.ns.local"), + } +) + +const dispatcherReadySubHeader = "K-Subscriber-Status" + +func getURL(s string) *apis.URL { + u, _ := apis.ParseURL(s) + return u +} + +func handleProbe(t *testing.T) func(http.ResponseWriter, *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + channelRefName := channelTemplate.ObjectMeta.Name + channelRefNamespace := channelTemplate.ObjectMeta.Namespace + var subscriptions = map[string][]int{ + string(subscriptionTemplate.UID): {0}, + } + w.Header().Set(dispatcherReadySubHeader, channelRefName) + jsonResult, err := json.Marshal(subscriptions) + if err != nil { + t.Fatalf("Error marshalling json for sub-status channelref: %s/%s, %v", channelRefNamespace, channelRefName, err) + } + _, err = w.Write(jsonResult) + if err != nil { + t.Fatalf("Error writing jsonResult to serveHTTP writer: %v", err) + } + } +} + +type ReadyPair struct { + c v1beta1.KafkaChannel + s eventingduckv1.SubscriberSpec +} + +func TestProbeSinglePod(t *testing.T) { + var succeed atomic.Bool + + ch := channelTemplate.DeepCopy() + sub := subscriptionTemplate.DeepCopy() + + probeHandler := http.HandlerFunc(handleProbe(t)) + + // Probes only succeed if succeed is true + probeRequests := make(chan *http.Request) + finalHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + probeRequests <- r + if !succeed.Load() { + w.WriteHeader(http.StatusNotFound) + return + } + + probeHandler.ServeHTTP(w, r) + }) + + ts := httptest.NewServer(finalHandler) + defer ts.Close() + tsURL, err := url.Parse(ts.URL) + if err != nil { + t.Fatalf("Failed to parse URL %q: %v", ts.URL, err) + } + port, err := strconv.Atoi(tsURL.Port()) + if err != nil { + t.Fatalf("Failed to parse port %q: %v", tsURL.Port(), err) + } + hostname := tsURL.Hostname() + + ready := make(chan *ReadyPair) + + prober := NewProber( + zaptest.NewLogger(t).Sugar(), + fakeProbeTargetLister{ + PodIPs: sets.NewString(hostname), + PodPort: strconv.Itoa(port), + URL: tsURL, + }, + func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { + ready <- &ReadyPair{ + c, + s, + } + }) + + done := make(chan struct{}) + cancelled := prober.Start(done) + defer func() { + close(done) + <-cancelled + }() + + // The first call to IsReady must succeed and return false + ok, err := prober.IsReady(context.Background(), *ch, *sub) + if err != nil { + t.Fatal("IsReady failed:", err) + } + if ok { + t.Fatal("IsReady() returned true") + } + + select { + case <-ready: + // Since succeed is still false and we don't return 200, the prober shouldn't be ready + t.Fatal("Prober shouldn't be ready") + case <-time.After(1 * time.Second): + // Not ideal but it gives time to the prober to write to ready + break + } + + // Make probes to hostB succeed + succeed.Store(true) + + // Just drain the requests in the channel to not block the handler + go func() { + for range probeRequests { + } + }() + + select { + case <-ready: + // Wait for the probing to eventually succeed + case <-time.After(5 * time.Second): + t.Error("Timed out waiting for probing to succeed.") + } +} + +func TestProbeListerFail(t *testing.T) { + ch := channelTemplate.DeepCopy() + sub := subscriptionTemplate.DeepCopy() + + ready := make(chan *ReadyPair) + defer close(ready) + prober := NewProber( + zaptest.NewLogger(t).Sugar(), + notFoundLister{}, + func(c v1beta1.KafkaChannel, s eventingduckv1.SubscriberSpec) { + ready <- &ReadyPair{ + c, + s, + } + }) + + // If we can't list, this must fail and return false + ok, err := prober.IsReady(context.Background(), *ch, *sub) + if err == nil { + t.Fatal("IsReady returned unexpected success") + } + if ok { + t.Fatal("IsReady() returned true") + } +} + +type fakeProbeTargetLister ProbeTarget + +func (l fakeProbeTargetLister) ListProbeTargets(ctx context.Context, kc messagingv1beta1.KafkaChannel) (*ProbeTarget, error) { + t := ProbeTarget(l) + return &t, nil +} + +type notFoundLister struct{} + +func (l notFoundLister) ListProbeTargets(ctx context.Context, kc messagingv1beta1.KafkaChannel) (*ProbeTarget, error) { + return nil, errors.New("not found") +} diff --git a/pkg/common/consumer/consumer_handler.go b/pkg/common/consumer/consumer_handler.go index 092b3083fc..681262877d 100644 --- a/pkg/common/consumer/consumer_handler.go +++ b/pkg/common/consumer/consumer_handler.go @@ -28,7 +28,8 @@ type KafkaConsumerHandler interface { // When this function returns true, the consumer group offset is marked as consumed. // The returned error is enqueued in errors channel. Handle(context context.Context, message *sarama.ConsumerMessage) (bool, error) - SetReady(ready bool) + SetReady(partition int32, ready bool) + GetConsumerGroup() string } // ConsumerHandler implements sarama.ConsumerGroupHandler and provides some glue code to simplify message handling @@ -59,15 +60,21 @@ func (consumer *SaramaConsumerHandler) Setup(sarama.ConsumerGroupSession) error // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited func (consumer *SaramaConsumerHandler) Cleanup(session sarama.ConsumerGroupSession) error { - consumer.logger.Info("cleanup handler") - consumer.handler.SetReady(false) + consumer.logger.Infow("Cleanup handler") + for t, ps := range session.Claims() { + for _, p := range ps { + consumer.logger.Debugw("Cleanup handler: Setting partition readiness to false", zap.String("topic", t), + zap.Int32("partition", p)) + consumer.handler.SetReady(p, false) + } + } return nil } // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages(). func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error { - consumer.logger.Info(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset())) - + consumer.logger.Infow(fmt.Sprintf("Starting partition consumer, topic: %s, partition: %d, initialOffset: %d", claim.Topic(), claim.Partition(), claim.InitialOffset()), zap.String("ConsumeGroup", consumer.handler.GetConsumerGroup())) + consumer.handler.SetReady(claim.Partition(), true) // NOTE: // Do not move the code below to a goroutine. // The `ConsumeClaim` itself is called within a goroutine, see: @@ -85,7 +92,7 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if err != nil { consumer.logger.Infow("Failure while handling a message", zap.String("topic", message.Topic), zap.Int32("partition", message.Partition), zap.Int64("offset", message.Offset), zap.Error(err)) consumer.errors <- err - consumer.handler.SetReady(false) + consumer.handler.SetReady(claim.Partition(), false) } if mustMark { @@ -93,7 +100,6 @@ func (consumer *SaramaConsumerHandler) ConsumeClaim(session sarama.ConsumerGroup if ce := consumer.logger.Desugar().Check(zap.DebugLevel, "debugging"); ce != nil { consumer.logger.Debugw("Message marked", zap.String("topic", message.Topic), zap.Binary("value", message.Value)) } - consumer.handler.SetReady(true) } } diff --git a/pkg/common/consumer/consumer_handler_test.go b/pkg/common/consumer/consumer_handler_test.go index 14953e6c1a..2835872580 100644 --- a/pkg/common/consumer/consumer_handler_test.go +++ b/pkg/common/consumer/consumer_handler_test.go @@ -114,7 +114,11 @@ func (m mockMessageHandler) Handle(ctx context.Context, message *sarama.Consumer } } -func (m mockMessageHandler) SetReady(ready bool) { +func (m mockMessageHandler) SetReady(int32, bool) { +} + +func (m mockMessageHandler) GetConsumerGroup() string { + return "consumer group" } //------ Tests diff --git a/pkg/source/adapter/adapter.go b/pkg/source/adapter/adapter.go index 0a3c7b1985..6c7dd48c58 100644 --- a/pkg/source/adapter/adapter.go +++ b/pkg/source/adapter/adapter.go @@ -82,6 +82,9 @@ func NewAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, httpMe keyTypeMapper: getKeyTypeMapper(config.KeyType), } } +func (a *Adapter) GetConsumerGroup() string { + return a.config.ConsumerGroup +} func (a *Adapter) Start(ctx context.Context) error { a.logger.Infow("Starting with config: ", @@ -122,7 +125,7 @@ func (a *Adapter) Start(ctx context.Context) error { return nil } -func (a *Adapter) SetReady(_ bool) {} +func (a *Adapter) SetReady(int32, bool) {} func (a *Adapter) Handle(ctx context.Context, msg *sarama.ConsumerMessage) (bool, error) { if a.rateLimiter != nil { diff --git a/third_party/VENDOR-LICENSE/knative.dev/networking/pkg/prober/LICENSE b/third_party/VENDOR-LICENSE/knative.dev/networking/pkg/prober/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/third_party/VENDOR-LICENSE/knative.dev/networking/pkg/prober/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/github.com/gogo/protobuf/proto/text_parser.go b/vendor/github.com/gogo/protobuf/proto/text_parser.go index 1ce0be2fa9..f85c0cc81a 100644 --- a/vendor/github.com/gogo/protobuf/proto/text_parser.go +++ b/vendor/github.com/gogo/protobuf/proto/text_parser.go @@ -318,7 +318,7 @@ func unescape(s string) (ch string, tail string, err error) { if i > utf8.MaxRune { return "", "", fmt.Errorf(`\%c%s is not a valid Unicode code point`, r, ss) } - return string(i), s, nil + return string(rune(i)), s, nil } return "", "", fmt.Errorf(`unknown escape \%c`, r) } diff --git a/vendor/golang.org/x/tools/cmd/goimports/doc.go b/vendor/golang.org/x/tools/cmd/goimports/doc.go index 7033e4d4cf..f344d8014a 100644 --- a/vendor/golang.org/x/tools/cmd/goimports/doc.go +++ b/vendor/golang.org/x/tools/cmd/goimports/doc.go @@ -1,3 +1,7 @@ +// Copyright 2013 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + /* Command goimports updates your Go import lines, diff --git a/vendor/golang.org/x/tools/go/ast/astutil/util.go b/vendor/golang.org/x/tools/go/ast/astutil/util.go index 7630629824..919d5305ab 100644 --- a/vendor/golang.org/x/tools/go/ast/astutil/util.go +++ b/vendor/golang.org/x/tools/go/ast/astutil/util.go @@ -1,3 +1,7 @@ +// Copyright 2015 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package astutil import "go/ast" diff --git a/vendor/golang.org/x/tools/imports/forward.go b/vendor/golang.org/x/tools/imports/forward.go index a4e40adba0..8be18a66b3 100644 --- a/vendor/golang.org/x/tools/imports/forward.go +++ b/vendor/golang.org/x/tools/imports/forward.go @@ -1,3 +1,7 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + // Package imports implements a Go pretty-printer (like package "go/format") // that also adds or removes import statements as necessary. package imports // import "golang.org/x/tools/imports" diff --git a/vendor/golang.org/x/tools/internal/gocommand/version.go b/vendor/golang.org/x/tools/internal/gocommand/version.go index 60d45ac0e6..0cebac6e66 100644 --- a/vendor/golang.org/x/tools/internal/gocommand/version.go +++ b/vendor/golang.org/x/tools/internal/gocommand/version.go @@ -16,9 +16,20 @@ func GoVersion(ctx context.Context, inv Invocation, r *Runner) (int, error) { inv.Verb = "list" inv.Args = []string{"-e", "-f", `{{context.ReleaseTags}}`} inv.Env = append(append([]string{}, inv.Env...), "GO111MODULE=off") - // Unset any unneeded flags. + // Unset any unneeded flags, and remove them from BuildFlags, if they're + // present. inv.ModFile = "" inv.ModFlag = "" + var buildFlags []string + for _, flag := range inv.BuildFlags { + // Flags can be prefixed by one or two dashes. + f := strings.TrimPrefix(strings.TrimPrefix(flag, "-"), "-") + if strings.HasPrefix(f, "mod=") || strings.HasPrefix(f, "modfile=") { + continue + } + buildFlags = append(buildFlags, flag) + } + inv.BuildFlags = buildFlags stdoutBytes, err := r.Run(ctx, inv) if err != nil { return 0, err diff --git a/vendor/golang.org/x/tools/internal/imports/mod.go b/vendor/golang.org/x/tools/internal/imports/mod.go index ce3269a430..901449a820 100644 --- a/vendor/golang.org/x/tools/internal/imports/mod.go +++ b/vendor/golang.org/x/tools/internal/imports/mod.go @@ -1,3 +1,7 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package imports import ( diff --git a/vendor/golang.org/x/tools/internal/imports/mod_cache.go b/vendor/golang.org/x/tools/internal/imports/mod_cache.go index 5b4f03accd..18dada495c 100644 --- a/vendor/golang.org/x/tools/internal/imports/mod_cache.go +++ b/vendor/golang.org/x/tools/internal/imports/mod_cache.go @@ -1,3 +1,7 @@ +// Copyright 2019 The Go Authors. All rights reserved. +// Use of this source code is governed by a BSD-style +// license that can be found in the LICENSE file. + package imports import ( diff --git a/vendor/knative.dev/networking/LICENSE b/vendor/knative.dev/networking/LICENSE new file mode 100644 index 0000000000..261eeb9e9f --- /dev/null +++ b/vendor/knative.dev/networking/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/vendor/knative.dev/networking/pkg/prober/doc.go b/vendor/knative.dev/networking/pkg/prober/doc.go new file mode 100644 index 0000000000..1c971e14c6 --- /dev/null +++ b/vendor/knative.dev/networking/pkg/prober/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package prober contains functionality for implementing probing in knative serving. +package prober diff --git a/vendor/knative.dev/networking/pkg/prober/prober.go b/vendor/knative.dev/networking/pkg/prober/prober.go new file mode 100644 index 0000000000..6b216609aa --- /dev/null +++ b/vendor/knative.dev/networking/pkg/prober/prober.go @@ -0,0 +1,199 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package prober + +import ( + "context" + "fmt" + "io/ioutil" + "net/http" + "sync" + "time" + + "go.uber.org/zap" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "knative.dev/pkg/logging" +) + +// Preparer is a way for the caller to modify the HTTP request before it goes out. +type Preparer func(r *http.Request) *http.Request + +// Verifier is a way for the caller to validate the HTTP response after it comes back. +type Verifier func(r *http.Response, b []byte) (bool, error) + +// WithHeader sets a header in the probe request. +func WithHeader(name, value string) Preparer { + return func(r *http.Request) *http.Request { + r.Header.Set(name, value) + return r + } +} + +// WithHost sets the host in the probe request. +func WithHost(host string) Preparer { + return func(r *http.Request) *http.Request { + r.Host = host + return r + } +} + +// WithPath sets the path in the probe request. +func WithPath(path string) Preparer { + return func(r *http.Request) *http.Request { + r.URL.Path = path + return r + } +} + +// ExpectsBody validates that the body of the probe response matches the provided string. +func ExpectsBody(body string) Verifier { + return func(r *http.Response, b []byte) (bool, error) { + if string(b) == body { + return true, nil + } + return false, fmt.Errorf("unexpected body: want %q, got %q", body, string(b)) + } +} + +// ExpectsHeader validates that the given header of the probe response matches the provided string. +func ExpectsHeader(name, value string) Verifier { + return func(r *http.Response, _ []byte) (bool, error) { + if r.Header.Get(name) == value { + return true, nil + } + return false, fmt.Errorf("unexpected header %q: want %q, got %q", name, value, r.Header.Get(name)) + } +} + +// ExpectsStatusCodes validates that the given status code of the probe response matches the provided int. +func ExpectsStatusCodes(statusCodes []int) Verifier { + return func(r *http.Response, _ []byte) (bool, error) { + for _, v := range statusCodes { + if r.StatusCode == v { + return true, nil + } + } + return false, fmt.Errorf("unexpected status code: want %v, got %v", statusCodes, r.StatusCode) + } +} + +// Do sends a single probe to given target, e.g. `http://revision.default.svc.cluster.local:81`. +// Do returns whether the probe was successful or not, or there was an error probing. +func Do(ctx context.Context, transport http.RoundTripper, target string, ops ...interface{}) (bool, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, target, nil) + if err != nil { + return false, fmt.Errorf("%s is not a valid URL: %w", target, err) + } + for _, op := range ops { + if po, ok := op.(Preparer); ok { + req = po(req) + } + } + + resp, err := transport.RoundTrip(req) + if err != nil { + return false, fmt.Errorf("error roundtripping %s: %w", target, err) + } + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return false, fmt.Errorf("error reading body: %w", err) + } + + for _, op := range ops { + if vo, ok := op.(Verifier); ok { + if ok, err := vo(resp, body); err != nil || !ok { + return false, err + } + } + } + return true, nil +} + +// Done is a callback that is executed when the async probe has finished. +// `arg` is given by the caller at the offering time, while `success` and `err` +// are the return values of the `Do` call. +// It is assumed that the opaque arg is consistent for a given target and +// we will coalesce concurrent Offer invocations on target. +type Done func(arg interface{}, success bool, err error) + +// Manager manages async probes and makes sure we run concurrently only a single +// probe for the same key. +type Manager struct { + cb Done + // NB: it is paramount to use a transport that will close the connection + // after every request here. Otherwise the cached connections will prohibit + // scaling to zero, due to unsuccessful probes to the Activator. + transport http.RoundTripper + + // mu guards keys. + mu sync.Mutex + keys sets.String +} + +// New creates a new Manager, that will invoke the given callback when +// async probing is finished. +func New(cb Done, transport http.RoundTripper) *Manager { + return &Manager{ + keys: sets.NewString(), + cb: cb, + transport: transport, + } +} + +// Offer executes asynchronous probe using `target` as the key. +// If a probe with the same key already exists, Offer will return false and the +// call is discarded. If the request is accepted, Offer returns true. +// Otherwise Offer starts a goroutine that periodically executes +// `Do`, until timeout is reached, the probe succeeds, or fails with an error. +// In the end the callback is invoked with the provided `arg` and probing results. +func (m *Manager) Offer(ctx context.Context, target string, arg interface{}, period, timeout time.Duration, ops ...interface{}) bool { + m.mu.Lock() + defer m.mu.Unlock() + if m.keys.Has(target) { + return false + } + m.keys.Insert(target) + m.doAsync(ctx, target, arg, period, timeout, ops...) + return true +} + +// doAsync starts a go routine that probes the target with given period. +func (m *Manager) doAsync(ctx context.Context, target string, arg interface{}, period, timeout time.Duration, ops ...interface{}) { + logger := logging.FromContext(ctx) + go func() { + defer func() { + m.mu.Lock() + defer m.mu.Unlock() + m.keys.Delete(target) + }() + var ( + result bool + inErr error + ) + err := wait.PollImmediate(period, timeout, func() (bool, error) { + result, inErr = Do(ctx, m.transport, target, ops...) + // Do not return error, which is from verifierError, as retry is expected until timeout. + return result, nil + }) + if inErr != nil { + logger.Errorw("Unable to read sockstat", zap.Error(inErr)) + } + m.cb(arg, result, err) + }() +} diff --git a/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go new file mode 100644 index 0000000000..8f8a6fffbc --- /dev/null +++ b/vendor/knative.dev/pkg/client/injection/kube/informers/core/v1/pod/pod.go @@ -0,0 +1,52 @@ +/* +Copyright 2020 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Code generated by injection-gen. DO NOT EDIT. + +package pod + +import ( + context "context" + + v1 "k8s.io/client-go/informers/core/v1" + factory "knative.dev/pkg/client/injection/kube/informers/factory" + controller "knative.dev/pkg/controller" + injection "knative.dev/pkg/injection" + logging "knative.dev/pkg/logging" +) + +func init() { + injection.Default.RegisterInformer(withInformer) +} + +// Key is used for associating the Informer inside the context.Context. +type Key struct{} + +func withInformer(ctx context.Context) (context.Context, controller.Informer) { + f := factory.Get(ctx) + inf := f.Core().V1().Pods() + return context.WithValue(ctx, Key{}, inf), inf.Informer() +} + +// Get extracts the typed informer from the context. +func Get(ctx context.Context) v1.PodInformer { + untyped := ctx.Value(Key{}) + if untyped == nil { + logging.FromContext(ctx).Panic( + "Unable to fetch k8s.io/client-go/informers/core/v1.PodInformer from context.") + } + return untyped.(v1.PodInformer) +} diff --git a/vendor/modules.txt b/vendor/modules.txt index be865db6ea..36b4b3b5b9 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -185,7 +185,7 @@ github.com/go-openapi/spec github.com/go-openapi/swag # github.com/gobuffalo/flect v0.2.2 github.com/gobuffalo/flect -# github.com/gogo/protobuf v1.3.1 +# github.com/gogo/protobuf v1.3.2 github.com/gogo/protobuf/proto github.com/gogo/protobuf/sortkeys # github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b @@ -396,6 +396,7 @@ go.opentelemetry.io/otel/label go.opentelemetry.io/otel/propagation go.opentelemetry.io/otel/trace # go.uber.org/atomic v1.7.0 +## explicit go.uber.org/atomic # go.uber.org/automaxprocs v1.4.0 go.uber.org/automaxprocs/internal/cgroups @@ -471,7 +472,7 @@ golang.org/x/text/width # golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e ## explicit golang.org/x/time/rate -# golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818 +# golang.org/x/tools v0.0.0-20210106214847-113979e3529a golang.org/x/tools/cmd/goimports golang.org/x/tools/go/ast/astutil golang.org/x/tools/go/gcexportdata @@ -1211,6 +1212,9 @@ knative.dev/eventing/test/test_images/recordevents # knative.dev/hack v0.0.0-20210305150220-f99a25560134 ## explicit knative.dev/hack +# knative.dev/networking v0.0.0-20210304153916-f813b5904943 +## explicit +knative.dev/networking/pkg/prober # knative.dev/pkg v0.0.0-20210309024624-0f8d8de5949d ## explicit knative.dev/pkg/apiextensions/storageversion @@ -1241,6 +1245,7 @@ knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset knative.dev/pkg/client/injection/kube/informers/apps/v1/statefulset/fake knative.dev/pkg/client/injection/kube/informers/core/v1/endpoints knative.dev/pkg/client/injection/kube/informers/core/v1/namespace +knative.dev/pkg/client/injection/kube/informers/core/v1/pod knative.dev/pkg/client/injection/kube/informers/core/v1/secret knative.dev/pkg/client/injection/kube/informers/core/v1/secret/fake knative.dev/pkg/client/injection/kube/informers/core/v1/service