Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
Update control-protocol version (#718)
Browse files Browse the repository at this point in the history
  • Loading branch information
travis-minke-sap authored Jun 16, 2021
1 parent 19a507e commit 3c91bbd
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 27 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ require (
k8s.io/apimachinery v0.20.7
k8s.io/client-go v0.20.7
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0
knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79
knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35
knative.dev/hack v0.0.0-20210614141220-66ab1a098940
knative.dev/networking v0.0.0-20210615114921-e291c8011a20
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1346,8 +1346,8 @@ k8s.io/kube-openapi v0.0.0-20201113171705-d219536bb9fd/go.mod h1:WOJ3KddDSol4tAG
k8s.io/utils v0.0.0-20200729134348-d5654de09c73/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920 h1:CbnUZsM497iRC5QMVkHwyl8s2tB3g7yaSHkYPkpgelw=
k8s.io/utils v0.0.0-20201110183641-67b214c5f920/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0 h1:dakdOWnuJDlykt6/UkfiFz6ddoUd2P3Bc4sL1aF0VvE=
knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns=
knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79 h1:CepZkHLRpqnQtdbYtXa7R/V5LbjxzkKIQdQxNOFyrhY=
knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79/go.mod h1:iRE6O4mY88gH6xWl6ZFhG6sJobzrZjFrz5xhX4h93Ns=
knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35 h1:+XbEA0ge3iiWi7on4tLhM3T14YR7FtC8ts2GPAVvAYk=
knative.dev/eventing v0.23.1-0.20210615213121-4eb1738b5e35/go.mod h1:d4zElgG7AL4zDt7CY7aW6BHdITgkUifk7Hskl5p1JEI=
knative.dev/hack v0.0.0-20210601210329-de04b70e00d0/go.mod h1:PHt8x8yX5Z9pPquBEfIj0X66f8iWkWfR0S/sarACJrI=
Expand Down
2 changes: 1 addition & 1 deletion pkg/source/reconciler/source/kafkasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ type Reconciler struct {
configs KafkaSourceConfigAccessor

podIpGetter ctrlreconciler.PodIpGetter
connectionPool *ctrlreconciler.ControlPlaneConnectionPool
connectionPool ctrlreconciler.ControlPlaneConnectionPool
claimsNotificationStore *ctrlreconciler.NotificationStore
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,24 @@ import (
"knative.dev/control-protocol/pkg/message"
)

// AsyncCommandNotificationStore is a specialized NotificationStore that is capable to handle message.AsyncCommandResult
type AsyncCommandNotificationStore struct {
// AsyncCommandNotificationStore defines a specialized NotificationStore capable of handling message.AsyncCommandResults
type AsyncCommandNotificationStore interface {
GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult
CleanPodsNotifications(srcName types.NamespacedName)
CleanPodNotification(srcName types.NamespacedName, pod string)
MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler
}

var _ AsyncCommandNotificationStore = (*asyncCommandNotificationStoreImpl)(nil)

// asyncCommandNotificationStoreImpl is a specialized NotificationStore that is capable to handle message.AsyncCommandResult
type asyncCommandNotificationStoreImpl struct {
ns *NotificationStore
}

// NewAsyncCommandNotificationStore creates an AsyncCommandNotificationStore
func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName)) *AsyncCommandNotificationStore {
return &AsyncCommandNotificationStore{
func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName)) AsyncCommandNotificationStore {
return &asyncCommandNotificationStoreImpl{
ns: &NotificationStore{
enqueueKey: enqueueKey,
payloadParser: message.ParseAsyncCommandResult,
Expand All @@ -42,7 +52,7 @@ func NewAsyncCommandNotificationStore(enqueueKey func(name types.NamespacedName)
}

// GetCommandResult returns the message.AsyncCommandResult when the notification store contains the command result matching srcName, pod and generation
func (ns *AsyncCommandNotificationStore) GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult {
func (ns *asyncCommandNotificationStoreImpl) GetCommandResult(srcName types.NamespacedName, pod string, command message.AsyncCommand) *message.AsyncCommandResult {
val, ok := ns.ns.GetPodNotification(srcName, pod)
if !ok {
return nil
Expand All @@ -58,16 +68,16 @@ func (ns *AsyncCommandNotificationStore) GetCommandResult(srcName types.Namespac
}

// CleanPodsNotifications is like NotificationStore.CleanPodsNotifications
func (ns *AsyncCommandNotificationStore) CleanPodsNotifications(srcName types.NamespacedName) {
func (ns *asyncCommandNotificationStoreImpl) CleanPodsNotifications(srcName types.NamespacedName) {
ns.ns.CleanPodsNotifications(srcName)
}

// CleanPodNotification is like NotificationStore.CleanPodNotification
func (ns *AsyncCommandNotificationStore) CleanPodNotification(srcName types.NamespacedName, pod string) {
func (ns *asyncCommandNotificationStoreImpl) CleanPodNotification(srcName types.NamespacedName, pod string) {
ns.ns.CleanPodNotification(srcName, pod)
}

// MessageHandler is like NotificationStore.MessageHandler
func (ns *AsyncCommandNotificationStore) MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler {
func (ns *asyncCommandNotificationStoreImpl) MessageHandler(srcName types.NamespacedName, pod string) control.MessageHandler {
return ns.ns.MessageHandler(srcName, pod, PassNewValue)
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,20 @@ const (
keepAlive = 30 * time.Second
)

type ControlPlaneConnectionPool struct {
type ControlPlaneConnectionPool interface {
GetConnectedHosts(key string) []string
GetServices(key string) map[string]control.Service
ResolveControlInterface(key string, host string) (string, control.Service)
RemoveConnection(ctx context.Context, key string, host string)
RemoveAllConnections(ctx context.Context, key string)
Close(ctx context.Context)
ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error)
DialControlService(ctx context.Context, key string, host string) (string, control.Service, error)
}

var _ ControlPlaneConnectionPool = (*controlPlaneConnectionPoolImpl)(nil)

type controlPlaneConnectionPoolImpl struct {
tlsDialerFactory TLSDialerFactory
baseDialOptions *net.Dialer

Expand All @@ -48,12 +61,12 @@ type clientServiceHolder struct {
cancelFn context.CancelFunc
}

func NewInsecureControlPlaneConnectionPool(opts ...ControlPlaneConnectionPoolOption) *ControlPlaneConnectionPool {
func NewInsecureControlPlaneConnectionPool(opts ...ControlPlaneConnectionPoolOption) ControlPlaneConnectionPool {
return NewControlPlaneConnectionPool(nil, opts...)
}

func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...ControlPlaneConnectionPoolOption) *ControlPlaneConnectionPool {
pool := &ControlPlaneConnectionPool{
func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...ControlPlaneConnectionPoolOption) ControlPlaneConnectionPool {
pool := &controlPlaneConnectionPoolImpl{
tlsDialerFactory: tlsDialerFactory,
baseDialOptions: &net.Dialer{
KeepAlive: keepAlive,
Expand All @@ -69,7 +82,7 @@ func NewControlPlaneConnectionPool(tlsDialerFactory TLSDialerFactory, opts ...Co
return pool
}

func (cc *ControlPlaneConnectionPool) GetConnectedHosts(key string) []string {
func (cc *controlPlaneConnectionPoolImpl) GetConnectedHosts(key string) []string {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
var m map[string]clientServiceHolder
Expand All @@ -84,7 +97,7 @@ func (cc *ControlPlaneConnectionPool) GetConnectedHosts(key string) []string {
return hosts
}

func (cc *ControlPlaneConnectionPool) GetServices(key string) map[string]control.Service {
func (cc *controlPlaneConnectionPoolImpl) GetServices(key string) map[string]control.Service {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
var m map[string]clientServiceHolder
Expand All @@ -99,7 +112,7 @@ func (cc *ControlPlaneConnectionPool) GetServices(key string) map[string]control
return svcs
}

func (cc *ControlPlaneConnectionPool) ResolveControlInterface(key string, host string) (string, control.Service) {
func (cc *controlPlaneConnectionPoolImpl) ResolveControlInterface(key string, host string) (string, control.Service) {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
if m, ok := cc.conns[key]; !ok {
Expand All @@ -111,7 +124,7 @@ func (cc *ControlPlaneConnectionPool) ResolveControlInterface(key string, host s
return "", nil
}

func (cc *ControlPlaneConnectionPool) RemoveConnection(ctx context.Context, key string, host string) {
func (cc *controlPlaneConnectionPoolImpl) RemoveConnection(ctx context.Context, key string, host string) {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
m, ok := cc.conns[key]
Expand All @@ -129,7 +142,7 @@ func (cc *ControlPlaneConnectionPool) RemoveConnection(ctx context.Context, key
}
}

func (cc *ControlPlaneConnectionPool) RemoveAllConnections(ctx context.Context, key string) {
func (cc *controlPlaneConnectionPoolImpl) RemoveAllConnections(ctx context.Context, key string) {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
m, ok := cc.conns[key]
Expand All @@ -142,7 +155,7 @@ func (cc *ControlPlaneConnectionPool) RemoveAllConnections(ctx context.Context,
delete(cc.conns, key)
}

func (cc *ControlPlaneConnectionPool) Close(ctx context.Context) {
func (cc *controlPlaneConnectionPoolImpl) Close(ctx context.Context) {
cc.connsLock.Lock()
defer cc.connsLock.Unlock()
for _, m := range cc.conns {
Expand All @@ -154,7 +167,7 @@ func (cc *ControlPlaneConnectionPool) Close(ctx context.Context) {
cc.conns = make(map[string]map[string]clientServiceHolder)
}

func (cc *ControlPlaneConnectionPool) ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error) {
func (cc *controlPlaneConnectionPoolImpl) ReconcileConnections(ctx context.Context, key string, wantConnections []string, newServiceCb func(string, control.Service), oldServiceCb func(string)) (map[string]control.Service, error) {
existingConnections := cc.GetConnectedHosts(key)

newConnections := setDifference(wantConnections, existingConnections)
Expand Down Expand Up @@ -190,7 +203,7 @@ func (cc *ControlPlaneConnectionPool) ReconcileConnections(ctx context.Context,
return cc.GetServices(key), nil
}

func (cc *ControlPlaneConnectionPool) DialControlService(ctx context.Context, key string, host string) (string, control.Service, error) {
func (cc *controlPlaneConnectionPoolImpl) DialControlService(ctx context.Context, key string, host string) (string, control.Service, error) {
var dialer network.Dialer
dialer = cc.baseDialOptions
// Check if tlsDialerFactory is set up, otherwise connect without tls
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ package reconciler

import control "knative.dev/control-protocol/pkg"

type ControlPlaneConnectionPoolOption func(*ControlPlaneConnectionPool)
type ControlPlaneConnectionPoolOption func(*controlPlaneConnectionPoolImpl)

func WithServiceWrapper(wrapper control.ServiceWrapper) ControlPlaneConnectionPoolOption {
return func(pool *ControlPlaneConnectionPool) {
return func(pool *controlPlaneConnectionPoolImpl) {
pool.serviceWrapperFactories = append(pool.serviceWrapperFactories, wrapper)
}
}
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ k8s.io/utils/buffer
k8s.io/utils/integer
k8s.io/utils/pointer
k8s.io/utils/trace
# knative.dev/control-protocol v0.0.0-20210608143842-1c4b3e61cbc0
# knative.dev/control-protocol v0.0.0-20210616092621-1469a722be79
## explicit
knative.dev/control-protocol/pkg
knative.dev/control-protocol/pkg/certificates
Expand Down

0 comments on commit 3c91bbd

Please sign in to comment.