From 3c91bbd14c67d0a035f57186a1514b52a3ca8a8d Mon Sep 17 00:00:00 2001 From: Travis Minke Date: Wed, 16 Jun 2021 09:46:22 -0600 Subject: [PATCH] Update control-protocol version (#718) --- go.mod | 2 +- go.sum | 4 +- pkg/source/reconciler/source/kafkasource.go | 2 +- .../async_command_notification_store.go | 26 +++++++++---- .../pkg/reconciler/connection_pool.go | 37 +++++++++++++------ .../pkg/reconciler/connection_pool_option.go | 4 +- vendor/modules.txt | 2 +- 7 files changed, 50 insertions(+), 27 deletions(-) diff --git a/go.mod b/go.mod index b9a9d95121..c375f2e0a8 100644 --- a/go.mod +++ b/go.mod @@ -34,7 +34,7 @@ require ( k8s.io/apimachinery v0.20.7 k8s.io/client-go v0.20.7 k8s.io/utils v0.0.0-20201110183641-67b214c5f920 - knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0 + knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79 knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35 knative.dev/hack v0.0.0-20210614141220-66ab1a098940 knative.dev/networking v0.0.0-20210615114921-e291c8011a20 diff --git a/go.sum b/go.sum index d2f4bf8c14..95a0e43b04 100644 --- a/go.sum +++ b/go.sum @@ -1346,8 +1346,8 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw= k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA= -knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0 h1:dakdOWnuJDlykt6/UkfiFz6ddoUd2P3Bc4sL1aF0VvE= -knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns= +knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79 h1:CepZkHLRpqnQtdbYtXa7R/V5LbjxzkKIQdQxNOFyrhY= +knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns= knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35 h1:+XbEA0ge3iiWi7on4tLhM3T14YR7FtC8ts2GPAVvAYk= knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35/go.mod h1:d4zElgG7AL4zDt7CY7aW6BHdITgkUifk7Hskl5p1JEI= knative.dev/hack v0.0.0-20210601210329-de04b70e00d0/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI= diff --git a/pkg/source/reconciler/source/kafkasource.go b/pkg/source/reconciler/source/kafkasource.go index 49a181f69a..b31f2eb67f 100644 --- a/pkg/source/reconciler/source/kafkasource.go +++ b/pkg/source/reconciler/source/kafkasource.go @@ -109,7 +109,7 @@ type Reconciler struct { configs KafkaSourceConfigAccessor podIpGetter ctrlreconciler.PodIpGetter - connectionPool *ctrlreconciler.ControlPlaneConnectionPool + connectionPool ctrlreconciler.ControlPlaneConnectionPool claimsNotificationStore *ctrlreconciler.NotificationStore } diff --git a/vendor/knative.dev/control-protocol/pkg/reconciler/async_command_notification_store.go b/vendor/knative.dev/control-protocol/pkg/reconciler/async_command_notification_store.go index e4661642f9..f99e69d1f0 100644 --- a/vendor/knative.dev/control-protocol/pkg/reconciler/async_command_notification_store.go +++ b/vendor/knative.dev/control-protocol/pkg/reconciler/async_command_notification_store.go @@ -25,14 +25,24 @@ import ( "knative.dev/control-protocol/pkg/message" ) -// AsyncCommandNotificationStore is a specialized NotificationStore that is capable to handle message.AsyncCommandResult -type AsyncCommandNotificationStore struct { +// AsyncCommandNotificationStore defines a specialized NotificationStore capable of handling message.AsyncCommandResults +type AsyncCommandNotificationStore interface { + GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult + CleanPodsNotifications(srcName types.NamespacedName) + CleanPodNotification(srcName types.NamespacedName, pod string) + MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler +} + +var _ AsyncCommandNotificationStore = (*asyncCommandNotificationStoreImpl)(nil) + +// asyncCommandNotificationStoreImpl is a specialized NotificationStore that is capable to handle message.AsyncCommandResult +type asyncCommandNotificationStoreImpl struct { ns *NotificationStore } // NewAsyncCommandNotificationStore creates an AsyncCommandNotificationStore -func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName)) *AsyncCommandNotificationStore { - return &AsyncCommandNotificationStore{ +func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName)) AsyncCommandNotificationStore { + return &asyncCommandNotificationStoreImpl{ ns: &NotificationStore{ enqueueKey: enqueueKey, payloadParser: message.ParseAsyncCommandResult, @@ -42,7 +52,7 @@ func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName) } // GetCommandResult returns the message.AsyncCommandResult when the notification store contains the command result matching srcName, pod and generation -func (ns *AsyncCommandNotificationStore) GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult { +func (ns *asyncCommandNotificationStoreImpl) GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult { val, ok := ns.ns.GetPodNotification(srcName, pod) if !ok { return nil @@ -58,16 +68,16 @@ func (ns *AsyncCommandNotificationStore) GetCommandResult(srcName types.Namespac } // CleanPodsNotifications is like NotificationStore.CleanPodsNotifications -func (ns *AsyncCommandNotificationStore) CleanPodsNotifications(srcName types.NamespacedName) { +func (ns *asyncCommandNotificationStoreImpl) CleanPodsNotifications(srcName types.NamespacedName) { ns.ns.CleanPodsNotifications(srcName) } // CleanPodNotification is like NotificationStore.CleanPodNotification -func (ns *AsyncCommandNotificationStore) CleanPodNotification(srcName types.NamespacedName, pod string) { +func (ns *asyncCommandNotificationStoreImpl) CleanPodNotification(srcName types.NamespacedName, pod string) { ns.ns.CleanPodNotification(srcName, pod) } // MessageHandler is like NotificationStore.MessageHandler -func (ns *AsyncCommandNotificationStore) MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler { +func (ns *asyncCommandNotificationStoreImpl) MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler { return ns.ns.MessageHandler(srcName, pod, PassNewValue) } diff --git a/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool.go b/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool.go index c48869c97b..4fe890a2b0 100644 --- a/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool.go +++ b/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool.go @@ -33,7 +33,20 @@ const ( keepAlive = 30 * time.Second ) -type ControlPlaneConnectionPool struct { +type ControlPlaneConnectionPool interface { + GetConnectedHosts(key string) []string + GetServices(key string) map[string]control.Service + ResolveControlInterface(key string, host string) (string, control.Service) + RemoveConnection(ctx context.Context, key string, host string) + RemoveAllConnections(ctx context.Context, key string) + Close(ctx context.Context) + ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error) + DialControlService(ctx context.Context, key string, host string) (string, control.Service, error) +} + +var _ ControlPlaneConnectionPool = (*controlPlaneConnectionPoolImpl)(nil) + +type controlPlaneConnectionPoolImpl struct { tlsDialerFactory TLSDialerFactory baseDialOptions *net.Dialer @@ -48,12 +61,12 @@ type clientServiceHolder struct { cancelFn context.CancelFunc } -func NewInsecureControlPlaneConnectionPool(opts ...ControlPlaneConnectionPoolOption) *ControlPlaneConnectionPool { +func NewInsecureControlPlaneConnectionPool(opts ...ControlPlaneConnectionPoolOption) ControlPlaneConnectionPool { return NewControlPlaneConnectionPool(nil, opts...) } -func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...ControlPlaneConnectionPoolOption) *ControlPlaneConnectionPool { - pool := &ControlPlaneConnectionPool{ +func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...ControlPlaneConnectionPoolOption) ControlPlaneConnectionPool { + pool := &controlPlaneConnectionPoolImpl{ tlsDialerFactory: tlsDialerFactory, baseDialOptions: &net.Dialer{ KeepAlive: keepAlive, @@ -69,7 +82,7 @@ func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...Co return pool } -func (cc *ControlPlaneConnectionPool) GetConnectedHosts(key string) []string { +func (cc *controlPlaneConnectionPoolImpl) GetConnectedHosts(key string) []string { cc.connsLock.Lock() defer cc.connsLock.Unlock() var m map[string]clientServiceHolder @@ -84,7 +97,7 @@ func (cc *ControlPlaneConnectionPool) GetConnectedHosts(key string) []string { return hosts } -func (cc *ControlPlaneConnectionPool) GetServices(key string) map[string]control.Service { +func (cc *controlPlaneConnectionPoolImpl) GetServices(key string) map[string]control.Service { cc.connsLock.Lock() defer cc.connsLock.Unlock() var m map[string]clientServiceHolder @@ -99,7 +112,7 @@ func (cc *ControlPlaneConnectionPool) GetServices(key string) map[string]control return svcs } -func (cc *ControlPlaneConnectionPool) ResolveControlInterface(key string, host string) (string, control.Service) { +func (cc *controlPlaneConnectionPoolImpl) ResolveControlInterface(key string, host string) (string, control.Service) { cc.connsLock.Lock() defer cc.connsLock.Unlock() if m, ok := cc.conns[key]; !ok { @@ -111,7 +124,7 @@ func (cc *ControlPlaneConnectionPool) ResolveControlInterface(key string, host s return "", nil } -func (cc *ControlPlaneConnectionPool) RemoveConnection(ctx context.Context, key string, host string) { +func (cc *controlPlaneConnectionPoolImpl) RemoveConnection(ctx context.Context, key string, host string) { cc.connsLock.Lock() defer cc.connsLock.Unlock() m, ok := cc.conns[key] @@ -129,7 +142,7 @@ func (cc *ControlPlaneConnectionPool) RemoveConnection(ctx context.Context, key } } -func (cc *ControlPlaneConnectionPool) RemoveAllConnections(ctx context.Context, key string) { +func (cc *controlPlaneConnectionPoolImpl) RemoveAllConnections(ctx context.Context, key string) { cc.connsLock.Lock() defer cc.connsLock.Unlock() m, ok := cc.conns[key] @@ -142,7 +155,7 @@ func (cc *ControlPlaneConnectionPool) RemoveAllConnections(ctx context.Context, delete(cc.conns, key) } -func (cc *ControlPlaneConnectionPool) Close(ctx context.Context) { +func (cc *controlPlaneConnectionPoolImpl) Close(ctx context.Context) { cc.connsLock.Lock() defer cc.connsLock.Unlock() for _, m := range cc.conns { @@ -154,7 +167,7 @@ func (cc *ControlPlaneConnectionPool) Close(ctx context.Context) { cc.conns = make(map[string]map[string]clientServiceHolder) } -func (cc *ControlPlaneConnectionPool) ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error) { +func (cc *controlPlaneConnectionPoolImpl) ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error) { existingConnections := cc.GetConnectedHosts(key) newConnections := setDifference(wantConnections, existingConnections) @@ -190,7 +203,7 @@ func (cc *ControlPlaneConnectionPool) ReconcileConnections(ctx context.Context, return cc.GetServices(key), nil } -func (cc *ControlPlaneConnectionPool) DialControlService(ctx context.Context, key string, host string) (string, control.Service, error) { +func (cc *controlPlaneConnectionPoolImpl) DialControlService(ctx context.Context, key string, host string) (string, control.Service, error) { var dialer network.Dialer dialer = cc.baseDialOptions // Check if tlsDialerFactory is set up, otherwise connect without tls diff --git a/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool_option.go b/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool_option.go index 9384bc78ea..7b8c56b86f 100644 --- a/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool_option.go +++ b/vendor/knative.dev/control-protocol/pkg/reconciler/connection_pool_option.go @@ -18,10 +18,10 @@ package reconciler import control "knative.dev/control-protocol/pkg" -type ControlPlaneConnectionPoolOption func(*ControlPlaneConnectionPool) +type ControlPlaneConnectionPoolOption func(*controlPlaneConnectionPoolImpl) func WithServiceWrapper(wrapper control.ServiceWrapper) ControlPlaneConnectionPoolOption { - return func(pool *ControlPlaneConnectionPool) { + return func(pool *controlPlaneConnectionPoolImpl) { pool.serviceWrapperFactories = append(pool.serviceWrapperFactories, wrapper) } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 47457de05b..6977ad0a52 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1110,7 +1110,7 @@ k8s.io/utils/buffer k8s.io/utils/integer k8s.io/utils/pointer k8s.io/utils/trace -# knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0 +# knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79 ## explicit knative.dev/control-protocol/pkg knative.dev/control-protocol/pkg/certificates