diff --git a/cmd/osm-controller/osm-controller.go b/cmd/osm-controller/osm-controller.go index 17e5ba5b6b..588f5ba40f 100644 --- a/cmd/osm-controller/osm-controller.go +++ b/cmd/osm-controller/osm-controller.go @@ -8,7 +8,6 @@ import ( "os" "path" "strings" - "sync" "github.com/pkg/errors" "github.com/spf13/pflag" @@ -92,12 +91,6 @@ var ( certmanagerIssuerGroup = flags.String("cert-manager-issuer-group", "cert-manager.io", "cert-manager issuer group") ) -type controller struct { - debugServerRunning bool - debugComponents debugger.DebugConfig - debugServer httpserver.DebugServerInterface -} - func init() { flags.StringVarP(&verbosity, "verbosity", "v", "info", "Set log verbosity level") flags.StringVar(&meshName, "mesh-name", "", "OSM mesh name") @@ -258,56 +251,13 @@ func main() { // Expose /debug endpoints and data only if the enableDebugServer flag is enabled debugConfig := debugger.NewDebugConfig(certDebugger, xdsServer, meshCatalog, kubeConfig, kubeClient, cfg, kubernetesClient) - c := controller{ - debugServerRunning: !cfg.IsDebugServerEnabled(), - debugComponents: debugConfig, - debugServer: httpserver.NewDebugHTTPServer(debugConfig, constants.DebugPort), - } - - errs := make(chan error) - go c.configureDebugServer(cfg, errs) - - done := false - for !done { - select { - case <-stop: - done = true - case err := <-errs: - if err != nil { - log.Error().Err(err) - } - } - } + debugServerInterface := httpserver.NewDebugHTTPServer(debugConfig, constants.DebugPort) + httpserver.RegisterDebugServer(debugServerInterface, cfg) + <-stop log.Info().Msg("Goodbye!") } -func (c *controller) configureDebugServer(cfg configurator.Configurator, errs chan<- error) { - //GetAnnouncementsChannel will check ConfigMap every 3 * time.Second - var mutex = &sync.Mutex{} - for range cfg.GetAnnouncementsChannel() { - if c.debugServerRunning && !cfg.IsDebugServerEnabled() { - mutex.Lock() - err := c.debugServer.Stop() - if err == nil { - c.debugServer = nil - } - c.debugServerRunning = false - errs <- errors.Wrap(err, "unable to stop debug server") - mutex.Unlock() - } else if !c.debugServerRunning && cfg.IsDebugServerEnabled() { - mutex.Lock() - if c.debugServer == nil { - c.debugServer = httpserver.NewDebugHTTPServer(c.debugComponents, constants.DebugPort) - } - c.debugServer.Start() - c.debugServerRunning = true - errs <- nil - mutex.Unlock() - } - } -} - func getHTTPHealthProbes() []health.HTTPProbe { return []health.HTTPProbe{ { diff --git a/cmd/osm-controller/osm-controller_test.go b/cmd/osm-controller/osm-controller_test.go index 7711c69b0e..49aa5e3c99 100644 --- a/cmd/osm-controller/osm-controller_test.go +++ b/cmd/osm-controller/osm-controller_test.go @@ -2,12 +2,10 @@ package main import ( "context" - "net/http" "strconv" "testing" "time" - "github.com/golang/mock/gomock" "github.com/pkg/errors" "github.com/stretchr/testify/assert" @@ -20,144 +18,65 @@ import ( "github.com/openservicemesh/osm/pkg/certificate/providers/tresor" "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/constants" - "github.com/openservicemesh/osm/pkg/debugger" + "github.com/openservicemesh/osm/pkg/httpserver" ) const ( - validRoutePath = "/debug/test1" testOSMNamespace = "-test-osm-namespace-" testOSMConfigMapName = "-test-osm-config-map-" ) -func TestConfigureDebugServerStart(t *testing.T) { - assert := assert.New(t) - - // set up a controller - mockCtrl := gomock.NewController(t) - stop := make(chan struct{}) - - kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, false, stop) - if err != nil { - t.Fatal(err) - } - - fakeDebugServer := FakeDebugServer{0, 0, nil} - con := &controller{ - debugServerRunning: false, - debugComponents: mockDebugConfig(mockCtrl), - debugServer: &fakeDebugServer, - } - - errs := make(chan error) - go con.configureDebugServer(cfg, errs) - +func toggleDebugServer(enable bool, kubeClient *testclient.Clientset) error { updatedConfigMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Namespace: testOSMNamespace, Name: testOSMConfigMapName, }, Data: map[string]string{ - "enable_debug_server": "true", + "enable_debug_server": strconv.FormatBool(enable), }, } - _, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } - - getErrorOrTimeout(assert, errs, 1*time.Second) - close(stop) - assert.Equal(1, fakeDebugServer.startCount) - assert.Equal(0, fakeDebugServer.stopCount) - assert.True(con.debugServerRunning) - assert.NotNil(con.debugServer) + _, err := kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{}) + return err } -func TestConfigureDebugServerStop(t *testing.T) { +func TestDebugServer(t *testing.T) { assert := assert.New(t) // set up a controller - mockCtrl := gomock.NewController(t) stop := make(chan struct{}) + kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, false, stop) + assert.NoError(err) - kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, true, stop) - if err != nil { - t.Fatal(err) - } - - fakeDebugServer := FakeDebugServer{0, 0, nil} - con := &controller{ - debugServerRunning: true, - debugComponents: mockDebugConfig(mockCtrl), - debugServer: &fakeDebugServer, - } - - errs := make(chan error) - go con.configureDebugServer(cfg, errs) - - updatedConfigMap := v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testOSMNamespace, - Name: testOSMConfigMapName, - }, - Data: map[string]string{ - "enable_debug_server": "false", - }, - } - _, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } + fakeDebugServer := FakeDebugServer{0, 0, nil, false} + httpserver.RegisterDebugServer(&fakeDebugServer, cfg) - getErrorOrTimeout(assert, errs, 1*time.Second) - close(stop) assert.Equal(0, fakeDebugServer.startCount) - assert.Equal(1, fakeDebugServer.stopCount) - assert.False(con.debugServerRunning) - assert.Nil(con.debugServer) -} - -func TestConfigureDebugServerErr(t *testing.T) { - assert := assert.New(t) - - // set up a controller - mockCtrl := gomock.NewController(t) - stop := make(chan struct{}) - - kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, true, stop) - if err != nil { - t.Fatal(err) - } - - fakeDebugServer := FakeDebugServer{0, 0, errors.Errorf("Debug server error")} - con := &controller{ - debugServerRunning: true, - debugComponents: mockDebugConfig(mockCtrl), - debugServer: &fakeDebugServer, - } - errs := make(chan error) - go con.configureDebugServer(cfg, errs) - - updatedConfigMap := v1.ConfigMap{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: testOSMNamespace, - Name: testOSMConfigMapName, - }, - Data: map[string]string{ - "enable_debug_server": "false", - }, - } - _, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } + assert.Equal(0, fakeDebugServer.stopCount) - getErrorOrTimeout(assert, errs, 1*time.Second) - close(stop) - assert.Equal(0, fakeDebugServer.startCount) - assert.Equal(1, fakeDebugServer.stopCount) - assert.False(con.debugServerRunning) - assert.NotNil(con.debugServer) + // Start server + err = toggleDebugServer(true, kubeClient) + assert.NoError(err) + + assert.Eventually(func() bool { + return fakeDebugServer.running == true + }, 2*time.Second, 100*time.Millisecond) + assert.Eventually(func() bool { + return fakeDebugServer.startCount == 1 + }, 2*time.Second, 100*time.Millisecond) + assert.Equal(0, fakeDebugServer.stopCount) // No eventually for an non-expected-to-change value + + // Stop it + err = toggleDebugServer(false, kubeClient) + assert.NoError(err) + + assert.Eventually(func() bool { + return fakeDebugServer.running == false + }, 2*time.Second, 100*time.Millisecond) + assert.Eventually(func() bool { + return fakeDebugServer.stopCount == 1 + }, 2*time.Second, 100*time.Millisecond) + assert.Equal(1, fakeDebugServer.startCount) // No eventually for an non-expected-to-change value } func TestCreateCABundleKubernetesSecret(t *testing.T) { @@ -210,6 +129,7 @@ type FakeDebugServer struct { stopCount int startCount int stopErr error + running bool } func (f *FakeDebugServer) Stop() error { @@ -217,19 +137,13 @@ func (f *FakeDebugServer) Stop() error { if f.stopErr != nil { return errors.Errorf("Debug server error") } + f.running = false return nil } func (f *FakeDebugServer) Start() { f.startCount++ -} - -func mockDebugConfig(mockCtrl *gomock.Controller) *debugger.MockDebugServer { - mockDebugConfig := debugger.NewMockDebugServer(mockCtrl) - mockDebugConfig.EXPECT().GetHandlers().Return(map[string]http.Handler{ - validRoutePath: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}), - }).AnyTimes() - return mockDebugConfig + f.running = true } func setupComponents(namespace, configMapName string, initialDebugServerEnabled bool, stop chan struct{}) (*testclient.Clientset, v1.ConfigMap, configurator.Configurator, error) { @@ -252,11 +166,3 @@ func setupComponents(namespace, configMapName string, initialDebugServerEnabled cfg := configurator.NewConfigurator(kubeClient, stop, namespace, configMapName) return kubeClient, configMap, cfg, nil } - -func getErrorOrTimeout(assert *assert.Assertions, errs <-chan error, timeout time.Duration) { - select { - case <-errs: - case <-time.After(timeout): - assert.Fail("failed to receive error after " + timeout.String()) - } -} diff --git a/go.mod b/go.mod index 09cca984ee..f979242225 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/Azure/go-autorest/autorest/to v0.3.0 github.com/axw/gocov v1.0.0 github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 // indirect + github.com/cskr/pubsub v1.0.2 github.com/deckarep/golang-set v1.7.1 github.com/docker/docker v1.4.2-0.20200203170920-46ec8731fbce github.com/envoyproxy/go-control-plane v0.9.6 diff --git a/go.sum b/go.sum index 4282307325..cec9f59689 100644 --- a/go.sum +++ b/go.sum @@ -188,6 +188,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0= +github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis= github.com/cyphar/filepath-securejoin v0.2.2 h1:jCwT2GTP+PY5nBz3c/YL5PAIbusElVrPujOBSCj8xRg= github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4= github.com/daixiang0/gci v0.2.4 h1:BUCKk5nlK2m+kRIsoj+wb/5hazHvHeZieBKWd9Afa8Q= diff --git a/pkg/configurator/client.go b/pkg/configurator/client.go index 73b7584404..116e4d3fa1 100644 --- a/pkg/configurator/client.go +++ b/pkg/configurator/client.go @@ -4,6 +4,7 @@ import ( "fmt" "strconv" + "github.com/cskr/pubsub" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -48,6 +49,9 @@ const ( // serviceCertValidityDurationKey is the key name used to specify the validity duration of service certificates in the ConfigMap serviceCertValidityDurationKey = "service_cert_validity_duration" + + // defaultPubSubChannelSize is the default size of the buffered channel returned to the subscriber + defaultPubSubChannelSize = 128 ) // NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces. @@ -70,6 +74,7 @@ func newConfigurator(kubeClient kubernetes.Interface, stop <-chan struct{}, osmN announcements: make(chan a.Announcement), osmNamespace: osmNamespace, osmConfigMapName: osmConfigMapName, + pSub: pubsub.New(defaultPubSubChannelSize), } informerName := "ConfigMap" @@ -128,8 +133,19 @@ type osmConfig struct { ServiceCertValidityDuration string `yaml:"service_cert_validity_duration"` } +// This function captures the Announcements from k8s informer updates, and relays them to the subscribed +// members on the pubsub interface +func (c *Client) publish() { + for { + a := <-c.announcements + log.Debug().Msgf("OSM config publish: %s", a.Type.String()) + c.pSub.Pub(a, a.Type.String()) + } +} + func (c *Client) run(stop <-chan struct{}) { - go c.informer.Run(stop) + go c.publish() // prepare the publish interface + go c.informer.Run(stop) // run the informer synchronization log.Info().Msgf("Started OSM ConfigMap informer - watching for %s", c.getConfigMapCacheKey()) log.Info().Msg("[ConfigMap Client] Waiting for ConfigMap informer's cache to sync") if !cache.WaitForCacheSync(stop, c.informer.HasSynced) { diff --git a/pkg/configurator/client_test.go b/pkg/configurator/client_test.go index 04253e8675..cbae681fb4 100644 --- a/pkg/configurator/client_test.go +++ b/pkg/configurator/client_test.go @@ -11,6 +11,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" + + "github.com/openservicemesh/osm/pkg/announcements" ) const ( @@ -25,6 +27,10 @@ var _ = Describe("Test OSM ConfigMap parsing", func() { stop := make(<-chan struct{}) cfg := newConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) configMap := v1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ @@ -35,7 +41,7 @@ var _ = Describe("Test OSM ConfigMap parsing", func() { if _, err := kubeClient.CoreV1().ConfigMaps(osmNamespace).Create(context.TODO(), &configMap, metav1.CreateOptions{}); err != nil { GinkgoT().Fatalf("[TEST] Error creating ConfigMap %s/%s/: %s", configMap.Namespace, configMap.Name, err.Error()) } - <-cfg.GetAnnouncementsChannel() + <-confChannel Context("Ensure we are able to get reasonable defaults from ConfigMap", func() { diff --git a/pkg/configurator/methods.go b/pkg/configurator/methods.go index 78730c9111..186d3b83fa 100644 --- a/pkg/configurator/methods.go +++ b/pkg/configurator/methods.go @@ -104,11 +104,6 @@ func (c *Client) GetEnvoyLogLevel() string { return constants.DefaultEnvoyLogLevel } -// GetAnnouncementsChannel returns a channel, which is used to announce when changes have been made to the OSM ConfigMap. -func (c *Client) GetAnnouncementsChannel() <-chan announcements.Announcement { - return c.announcements -} - // GetServiceCertValidityPeriod returns the validity duration for service certificates, and a default in case of invalid duration func (c *Client) GetServiceCertValidityPeriod() time.Duration { durationStr := c.getConfigMap().ServiceCertValidityDuration @@ -120,3 +115,14 @@ func (c *Client) GetServiceCertValidityPeriod() time.Duration { return validityDuration } + +// Subscribe returns a channel subscribed to the announcement types passed by the given parameter +func (c *Client) Subscribe(aTypes ...announcements.AnnouncementType) chan interface{} { + // Cast of array of T types, even when T types are equivalent, is forbidden + subTypes := []string{} + for _, v := range aTypes { + subTypes = append(subTypes, string(v)) + } + + return c.pSub.Sub(subTypes...) +} diff --git a/pkg/configurator/methods_test.go b/pkg/configurator/methods_test.go index bc40852044..fbd124f5e8 100644 --- a/pkg/configurator/methods_test.go +++ b/pkg/configurator/methods_test.go @@ -10,6 +10,8 @@ import ( v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" + + "github.com/openservicemesh/osm/pkg/announcements" ) var _ = Describe("Test Envoy configuration creation", func() { @@ -41,6 +43,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("test GetConfigMap", func() { configMap := v1.ConfigMap{ @@ -53,7 +59,7 @@ var _ = Describe("Test Envoy configuration creation", func() { _, err := kubeClient.CoreV1().ConfigMaps(osmNamespace).Create(context.TODO(), &configMap, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-cfg.GetAnnouncementsChannel() + <-confChannel expectedConfig := &osmConfig{ PermissiveTrafficPolicyMode: false, @@ -77,6 +83,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that permissive_traffic_policy_mode is enabled", func() { Expect(cfg.IsPermissiveTrafficPolicyMode()).To(BeFalse()) @@ -93,7 +103,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -115,7 +125,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -128,6 +138,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that egress is enabled", func() { Expect(cfg.IsEgressEnabled()).To(BeFalse()) @@ -143,7 +157,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -165,7 +179,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -178,6 +192,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that the debug server is enabled", func() { Expect(cfg.IsDebugServerEnabled()).To(BeFalse()) @@ -193,7 +211,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -206,6 +224,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that the config is enabled", func() { Expect(cfg.IsPrometheusScrapingEnabled()).To(BeFalse()) @@ -221,7 +243,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -243,7 +265,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -256,6 +278,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that the config is enabled", func() { Expect(cfg.IsTracingEnabled()).To(BeFalse()) @@ -271,7 +297,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -293,7 +319,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -308,6 +334,10 @@ var _ = Describe("Test Envoy configuration creation", func() { testInfoEnvoyLogLevel := "info" testDebugEnvoyLogLevel := "debug" cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly identifies that the Envoy log level is error", func() { Expect(cfg.GetEnvoyLogLevel()).To(Equal(testErrorEnvoyLogLevel)) @@ -323,7 +353,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -345,8 +375,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() - + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -367,7 +396,7 @@ var _ = Describe("Test Envoy configuration creation", func() { // Wait for the config map change to propagate to the cache. log.Info().Msg("Waiting for announcement") - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetOSMNamespace()).To(Equal(osmNamespace)) Expect(err).ToNot(HaveOccurred()) @@ -380,6 +409,10 @@ var _ = Describe("Test Envoy configuration creation", func() { kubeClient := testclient.NewSimpleClientset() stop := make(chan struct{}) cfg := NewConfigurator(kubeClient, stop, osmNamespace, osmConfigMapName) + confChannel := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) It("correctly retrieves the default service cert validity duration when an invalid value is specified", func() { defaultConfigMap[serviceCertValidityDurationKey] = "5" // no units, so invalid @@ -393,7 +426,7 @@ var _ = Describe("Test Envoy configuration creation", func() { _, err := kubeClient.CoreV1().ConfigMaps(osmNamespace).Create(context.TODO(), &configMap, metav1.CreateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetServiceCertValidityPeriod()).To(Equal(time.Duration(24 * time.Hour))) }) @@ -410,7 +443,7 @@ var _ = Describe("Test Envoy configuration creation", func() { _, err := kubeClient.CoreV1().ConfigMaps(osmNamespace).Update(context.TODO(), &configMap, metav1.UpdateOptions{}) Expect(err).ToNot(HaveOccurred()) - <-cfg.GetAnnouncementsChannel() + <-confChannel Expect(cfg.GetServiceCertValidityPeriod()).To(Equal(time.Duration(1 * time.Hour))) }) diff --git a/pkg/configurator/mock_client.go b/pkg/configurator/mock_client.go index 635f755405..e7faa57bd1 100644 --- a/pkg/configurator/mock_client.go +++ b/pkg/configurator/mock_client.go @@ -5,11 +5,10 @@ package configurator import ( + gomock "github.com/golang/mock/gomock" + announcements "github.com/openservicemesh/osm/pkg/announcements" reflect "reflect" time "time" - - gomock "github.com/golang/mock/gomock" - "github.com/openservicemesh/osm/pkg/announcements" ) // MockConfigurator is a mock of Configurator interface @@ -35,20 +34,6 @@ func (m *MockConfigurator) EXPECT() *MockConfiguratorMockRecorder { return m.recorder } -// GetAnnouncementsChannel mocks base method -func (m *MockConfigurator) GetAnnouncementsChannel() <-chan announcements.Announcement { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetAnnouncementsChannel") - ret0, _ := ret[0].(<-chan announcements.Announcement) - return ret0 -} - -// GetAnnouncementsChannel indicates an expected call of GetAnnouncementsChannel -func (mr *MockConfiguratorMockRecorder) GetAnnouncementsChannel() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAnnouncementsChannel", reflect.TypeOf((*MockConfigurator)(nil).GetAnnouncementsChannel)) -} - // GetConfigMap mocks base method func (m *MockConfigurator) GetConfigMap() ([]byte, error) { m.ctrl.T.Helper() @@ -148,6 +133,20 @@ func (mr *MockConfiguratorMockRecorder) GetTracingPort() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetTracingPort", reflect.TypeOf((*MockConfigurator)(nil).GetTracingPort)) } +// IsDebugServerEnabled mocks base method +func (m *MockConfigurator) IsDebugServerEnabled() bool { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "IsDebugServerEnabled") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsDebugServerEnabled indicates an expected call of IsDebugServerEnabled +func (mr *MockConfiguratorMockRecorder) IsDebugServerEnabled() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDebugServerEnabled", reflect.TypeOf((*MockConfigurator)(nil).IsDebugServerEnabled)) +} + // IsEgressEnabled mocks base method func (m *MockConfigurator) IsEgressEnabled() bool { m.ctrl.T.Helper() @@ -176,14 +175,6 @@ func (mr *MockConfiguratorMockRecorder) IsPermissiveTrafficPolicyMode() *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsPermissiveTrafficPolicyMode", reflect.TypeOf((*MockConfigurator)(nil).IsPermissiveTrafficPolicyMode)) } -// IsDebugServerEnabled mocks base method -func (m *MockConfigurator) IsDebugServerEnabled() bool { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "IsDebugServerEnabled") - ret0, _ := ret[0].(bool) - return ret0 -} - // IsPrometheusScrapingEnabled mocks base method func (m *MockConfigurator) IsPrometheusScrapingEnabled() bool { m.ctrl.T.Helper() @@ -192,13 +183,6 @@ func (m *MockConfigurator) IsPrometheusScrapingEnabled() bool { return ret0 } -// IsDebugServerEnabled determines whether osm debug HTTP server is enabled -func (mr *MockConfiguratorMockRecorder) IsDebugServerEnabled() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsDebugServerEnabled", reflect.TypeOf((*MockConfigurator)(nil).IsDebugServerEnabled)) - -} - // IsPrometheusScrapingEnabled indicates an expected call of IsPrometheusScrapingEnabled func (mr *MockConfiguratorMockRecorder) IsPrometheusScrapingEnabled() *gomock.Call { mr.mock.ctrl.T.Helper() @@ -219,6 +203,24 @@ func (mr *MockConfiguratorMockRecorder) IsTracingEnabled() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsTracingEnabled", reflect.TypeOf((*MockConfigurator)(nil).IsTracingEnabled)) } +// Subscribe mocks base method +func (m *MockConfigurator) Subscribe(arg0 ...announcements.AnnouncementType) chan interface{} { + m.ctrl.T.Helper() + varargs := []interface{}{} + for _, a := range arg0 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Subscribe", varargs...) + ret0, _ := ret[0].(chan interface{}) + return ret0 +} + +// Subscribe indicates an expected call of Subscribe +func (mr *MockConfiguratorMockRecorder) Subscribe(arg0 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockConfigurator)(nil).Subscribe), arg0...) +} + // UseHTTPSIngress mocks base method func (m *MockConfigurator) UseHTTPSIngress() bool { m.ctrl.T.Helper() diff --git a/pkg/configurator/types.go b/pkg/configurator/types.go index 8e600d6274..2c25d69c21 100644 --- a/pkg/configurator/types.go +++ b/pkg/configurator/types.go @@ -3,6 +3,7 @@ package configurator import ( "time" + "github.com/cskr/pubsub" "k8s.io/client-go/tools/cache" "github.com/openservicemesh/osm/pkg/announcements" @@ -21,6 +22,7 @@ type Client struct { informer cache.SharedIndexInformer cache cache.Store cacheSynced chan interface{} + pSub *pubsub.PubSub } // Configurator is the controller interface for K8s namespaces @@ -31,6 +33,9 @@ type Configurator interface { // GetConfigMap returns the ConfigMap in pretty JSON (human readable) GetConfigMap() ([]byte, error) + // Subscribe returns a channel subscribed to the announcement types passed by parameter + Subscribe(...announcements.AnnouncementType) chan interface{} + // IsPermissiveTrafficPolicyMode determines whether we are in "allow-all" mode or SMI policy (block by default) mode IsPermissiveTrafficPolicyMode() bool @@ -61,9 +66,6 @@ type Configurator interface { // GetEnvoyLogLevel returns the envoy log level GetEnvoyLogLevel() string - // GetAnnouncementsChannel returns a channel, which is used to announce when changes have been made to the OSM ConfigMap - GetAnnouncementsChannel() <-chan announcements.Announcement - // GetServiceCertValidityPeriod returns the validity duration for service certificates GetServiceCertValidityPeriod() time.Duration } diff --git a/pkg/httpserver/debugger.go b/pkg/httpserver/debugger.go index 9a8e251bb9..88342ed2e6 100644 --- a/pkg/httpserver/debugger.go +++ b/pkg/httpserver/debugger.go @@ -5,6 +5,8 @@ import ( "fmt" "net/http" + "github.com/openservicemesh/osm/pkg/announcements" + "github.com/openservicemesh/osm/pkg/configurator" "github.com/openservicemesh/osm/pkg/debugger" "github.com/openservicemesh/osm/pkg/kubernetes/events" ) @@ -20,6 +22,44 @@ type DebugServerInterface interface { Start() } +// RegisterDebugServer registers a go routine to listen to configuration and configure debug server as needed +func RegisterDebugServer(dbgServerInterface DebugServerInterface, cfg configurator.Configurator) { + // Subscribe to configuration updates + ch := cfg.Subscribe( + announcements.ConfigMapAdded, + announcements.ConfigMapDeleted, + announcements.ConfigMapUpdated) + + // Run config listener + go func(cfgSubChannel chan interface{}, cf configurator.Configurator, dbgIf DebugServerInterface) { + // Bootstrap after subscribing + dbgSrvRunning := false + if cfg.IsDebugServerEnabled() { + dbgIf.Start() + dbgSrvRunning = true + } + + for { + <-cfgSubChannel + isDbgSrvEnabled := cfg.IsDebugServerEnabled() + + if isDbgSrvEnabled && !dbgSrvRunning { + log.Debug().Msgf("Starting DBG server") + dbgIf.Start() + dbgSrvRunning = true + } else if !isDbgSrvEnabled && dbgSrvRunning { + log.Debug().Msgf("Stopping DBG server") + err := dbgIf.Stop() + if err != nil { + log.Error().Msgf("Error stopping debug server: %v", err) + continue + } + dbgSrvRunning = false + } + } + }(ch, cfg, dbgServerInterface) +} + // NewDebugHTTPServer creates a new API Server for Debug func NewDebugHTTPServer(debugServer debugger.DebugConfig, apiPort int32) DebugServerInterface { return &DebugServer{ @@ -41,7 +81,7 @@ func (d *DebugServer) Start() { }() } -//Stop halts the DebugServer http.server +// Stop halts the DebugServer http.server func (d *DebugServer) Stop() error { ctx, cancel := context.WithTimeout(context.Background(), contextTimeoutDuration) defer cancel()