Skip to content
This repository has been archived by the owner on Jul 11, 2023. It is now read-only.

Commit

Permalink
configurator: clean private pubsub interface in favour of global one (#…
Browse files Browse the repository at this point in the history
…2219)

Configurator doesn't need to use a private pubsub interface; we
implemented the global one just after adding this one, which is no
longer needed.
  • Loading branch information
eduser25 authored Dec 17, 2020
1 parent a7666fb commit b24f86b
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 67 deletions.
16 changes: 0 additions & 16 deletions pkg/configurator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"fmt"
"strconv"

"github.com/cskr/pubsub"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -49,9 +48,6 @@ const (

// serviceCertValidityDurationKey is the key name used to specify the validity duration of service certificates in the ConfigMap
serviceCertValidityDurationKey = "service_cert_validity_duration"

// defaultPubSubChannelSize is the default size of the buffered channel returned to the subscriber
defaultPubSubChannelSize = 128
)

// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
Expand All @@ -74,7 +70,6 @@ func newConfigurator(kubeClient kubernetes.Interface, stop <-chan struct{}, osmN
announcements: make(chan a.Announcement),
osmNamespace: osmNamespace,
osmConfigMapName: osmConfigMapName,
pSub: pubsub.New(defaultPubSubChannelSize),
}

informerName := "ConfigMap"
Expand Down Expand Up @@ -133,18 +128,7 @@ type osmConfig struct {
ServiceCertValidityDuration string `yaml:"service_cert_validity_duration"`
}

// This function captures the Announcements from k8s informer updates, and relays them to the subscribed
// members on the pubsub interface
func (c *Client) publish() {
for {
a := <-c.announcements
log.Debug().Msgf("OSM config publish: %s", a.Type.String())
c.pSub.Pub(a, a.Type.String())
}
}

func (c *Client) run(stop <-chan struct{}) {
go c.publish() // prepare the publish interface
go c.informer.Run(stop) // run the informer synchronization
log.Info().Msgf("Started OSM ConfigMap informer - watching for %s", c.getConfigMapCacheKey())
log.Info().Msg("[ConfigMap Client] Waiting for ConfigMap informer's cache to sync")
Expand Down
6 changes: 5 additions & 1 deletion pkg/configurator/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
testclient "k8s.io/client-go/kubernetes/fake"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
)

const (
Expand All @@ -27,10 +28,13 @@ var _ = Describe("Test OSM ConfigMap parsing", func() {

stop := make(<-chan struct{})
cfg := newConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
Expect(cfg).ToNot(BeNil())

confChannel := events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
defer events.GetPubSubInstance().Unsub(confChannel)

configMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Expand Down
12 changes: 0 additions & 12 deletions pkg/configurator/methods.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"time"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/constants"
)

Expand Down Expand Up @@ -115,14 +114,3 @@ func (c *Client) GetServiceCertValidityPeriod() time.Duration {

return validityDuration
}

// Subscribe returns a channel subscribed to the announcement types passed by the given parameter
func (c *Client) Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} {
// Cast of array of T types, even when T types are equivalent, is forbidden
subTypes := []string{}
for _, v := range aTypes {
subTypes = append(subTypes, string(v))
}

return c.pSub.Sub(subTypes...)
}
128 changes: 96 additions & 32 deletions pkg/configurator/methods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
testclient "k8s.io/client-go/kubernetes/fake"

"github.com/openservicemesh/osm/pkg/announcements"
"github.com/openservicemesh/osm/pkg/kubernetes/events"
)

var _ = Describe("Test Envoy configuration creation", func() {
Expand Down Expand Up @@ -43,10 +44,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("test GetConfigMap", func() {
configMap := v1.ConfigMap{
Expand Down Expand Up @@ -83,10 +92,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that permissive_traffic_policy_mode is enabled", func() {
Expect(cfg.IsPermissiveTrafficPolicyMode()).To(BeFalse())
Expand Down Expand Up @@ -138,10 +155,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that egress is enabled", func() {
Expect(cfg.IsEgressEnabled()).To(BeFalse())
Expand Down Expand Up @@ -192,10 +217,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that the debug server is enabled", func() {
Expect(cfg.IsDebugServerEnabled()).To(BeFalse())
Expand Down Expand Up @@ -224,10 +257,17 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}
BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that the config is enabled", func() {
Expect(cfg.IsPrometheusScrapingEnabled()).To(BeFalse())
Expand Down Expand Up @@ -278,10 +318,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that the config is enabled", func() {
Expect(cfg.IsTracingEnabled()).To(BeFalse())
Expand Down Expand Up @@ -334,10 +382,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
testInfoEnvoyLogLevel := "info"
testDebugEnvoyLogLevel := "debug"
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly identifies that the Envoy log level is error", func() {
Expect(cfg.GetEnvoyLogLevel()).To(Equal(testErrorEnvoyLogLevel))
Expand Down Expand Up @@ -409,10 +465,18 @@ var _ = Describe("Test Envoy configuration creation", func() {
kubeClient := testclient.NewSimpleClientset()
stop := make(chan struct{})
cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName)
confChannel := cfg.Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
var confChannel chan interface{}

BeforeEach(func() {
confChannel = events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
})

AfterEach(func() {
events.GetPubSubInstance().Unsub(confChannel)
})

It("correctly retrieves the default service cert validity duration when an invalid value is specified", func() {
defaultConfigMap[serviceCertValidityDurationKey] = "5" // no units, so invalid
Expand Down
5 changes: 0 additions & 5 deletions pkg/configurator/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package configurator
import (
"time"

"github.com/cskr/pubsub"
"k8s.io/client-go/tools/cache"

"github.com/openservicemesh/osm/pkg/announcements"
Expand All @@ -22,7 +21,6 @@ type Client struct {
informer cache.SharedIndexInformer
cache cache.Store
cacheSynced chan interface{}
pSub *pubsub.PubSub
}

// Configurator is the controller interface for K8s namespaces
Expand All @@ -33,9 +31,6 @@ type Configurator interface {
// GetConfigMap returns the ConfigMap in pretty JSON (human readable)
GetConfigMap() ([]byte, error)

// Subscribe returns a channel subscribed to the announcement types passed by parameter
Subscribe(...announcements.AnnouncementType) chan interface{}

// IsPermissiveTrafficPolicyMode determines whether we are in "allow-all" mode or SMI policy (block by default) mode
IsPermissiveTrafficPolicyMode() bool

Expand Down
2 changes: 1 addition & 1 deletion pkg/httpserver/debugger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ type DebugServerInterface interface {
// RegisterDebugServer registers a go routine to listen to configuration and configure debug server as needed
func RegisterDebugServer(dbgServerInterface DebugServerInterface, cfg configurator.Configurator) {
// Subscribe to configuration updates
ch := cfg.Subscribe(
ch := events.GetPubSubInstance().Subscribe(
announcements.ConfigMapAdded,
announcements.ConfigMapDeleted,
announcements.ConfigMapUpdated)
Expand Down

0 comments on commit b24f86b

Please sign in to comment.