Skip to content

Commit

Permalink
pkg/configurator: introducing pubsub for config updates (openservicem…
Browse files Browse the repository at this point in the history
…esh#2050)

* pkg/configurator: introducing pubsub for config updates

This commit introduces a configuration interface to subscribe to
announcement types for configurator.

This interfaces provides a simple way for multiple remote
packages/modules/goroutines to subscribe to events of their interest.

The reasoning is to prioritize decoupling between modules so much
they can handle their own logic, as opposed to having a single
thread/goroutine handle all configuration updates, thus triggering
remote functions on the appropriate modules upon config changes.

The GetAnnouncementChannel() interface has been removed, and instead a
middleman routine in configurator has been spawned, which will be the one
to multiplex announcement channel updates through the publish interface.

Some cleanup has happened for DebugServer, which was consuming the cfg channel
interface on osm-controller. Has been moved to its relevant package and
is already working under pub/sub model.

* Update pkg/configurator/types.go

* Apply suggestions from code review

Co-authored-by: Shashank Ram <shashank08@gmail.com>

Co-authored-by: Shashank Ram <shashank08@gmail.com>
  • Loading branch information
eduser25 and shashankram authored Nov 16, 2020
1 parent f362b18 commit 9ea8a6f
Show file tree
Hide file tree
Showing 11 changed files with 207 additions and 243 deletions.
56 changes: 3 additions & 53 deletions cmd/osm-controller/osm-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"os"
"path"
"strings"
"sync"

"github.com/pkg/errors"
"github.com/spf13/pflag"
Expand Down Expand Up @@ -92,12 +91,6 @@ var (
certmanagerIssuerGroup = flags.String("cert-manager-issuer-group", "cert-manager.io", "cert-manager issuer group")
)

type controller struct {
debugServerRunning bool
debugComponents debugger.DebugConfig
debugServer httpserver.DebugServerInterface
}

func init() {
flags.StringVarP(&verbosity, "verbosity", "v", "info", "Set log verbosity level")
flags.StringVar(&meshName, "mesh-name", "", "OSM mesh name")
Expand Down Expand Up @@ -258,56 +251,13 @@ func main() {

// Expose /debug endpoints and data only if the enableDebugServer flag is enabled
debugConfig := debugger.NewDebugConfig(certDebugger, xdsServer, meshCatalog, kubeConfig, kubeClient, cfg, kubernetesClient)
c := controller{
debugServerRunning: !cfg.IsDebugServerEnabled(),
debugComponents: debugConfig,
debugServer: httpserver.NewDebugHTTPServer(debugConfig, constants.DebugPort),
}

errs := make(chan error)
go c.configureDebugServer(cfg, errs)

done := false
for !done {
select {
case <-stop:
done = true
case err := <-errs:
if err != nil {
log.Error().Err(err)
}
}
}
debugServerInterface := httpserver.NewDebugHTTPServer(debugConfig, constants.DebugPort)
httpserver.RegisterDebugServer(debugServerInterface, cfg)

<-stop
log.Info().Msg("Goodbye!")
}

func (c *controller) configureDebugServer(cfg configurator.Configurator, errs chan<- error) {
//GetAnnouncementsChannel will check ConfigMap every 3 * time.Second
var mutex = &sync.Mutex{}
for range cfg.GetAnnouncementsChannel() {
if c.debugServerRunning && !cfg.IsDebugServerEnabled() {
mutex.Lock()
err := c.debugServer.Stop()
if err == nil {
c.debugServer = nil
}
c.debugServerRunning = false
errs <- errors.Wrap(err, "unable to stop debug server")
mutex.Unlock()
} else if !c.debugServerRunning && cfg.IsDebugServerEnabled() {
mutex.Lock()
if c.debugServer == nil {
c.debugServer = httpserver.NewDebugHTTPServer(c.debugComponents, constants.DebugPort)
}
c.debugServer.Start()
c.debugServerRunning = true
errs <- nil
mutex.Unlock()
}
}
}

func getHTTPHealthProbes() []health.HTTPProbe {
return []health.HTTPProbe{
{
Expand Down
168 changes: 37 additions & 131 deletions cmd/osm-controller/osm-controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,10 @@ package main

import (
"context"
"net/http"
"strconv"
"testing"
"time"

"github.com/golang/mock/gomock"
"github.com/pkg/errors"

"github.com/stretchr/testify/assert"
Expand All @@ -20,144 +18,65 @@ import (
"github.com/openservicemesh/osm/pkg/certificate/providers/tresor"
"github.com/openservicemesh/osm/pkg/configurator"
"github.com/openservicemesh/osm/pkg/constants"
"github.com/openservicemesh/osm/pkg/debugger"
"github.com/openservicemesh/osm/pkg/httpserver"
)

const (
validRoutePath = "/debug/test1"
testOSMNamespace = "-test-osm-namespace-"
testOSMConfigMapName = "-test-osm-config-map-"
)

func TestConfigureDebugServerStart(t *testing.T) {
assert := assert.New(t)

// set up a controller
mockCtrl := gomock.NewController(t)
stop := make(chan struct{})

kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, false, stop)
if err != nil {
t.Fatal(err)
}

fakeDebugServer := FakeDebugServer{0, 0, nil}
con := &controller{
debugServerRunning: false,
debugComponents: mockDebugConfig(mockCtrl),
debugServer: &fakeDebugServer,
}

errs := make(chan error)
go con.configureDebugServer(cfg, errs)

func toggleDebugServer(enable bool, kubeClient *testclient.Clientset) error {
updatedConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: testOSMNamespace,
Name: testOSMConfigMapName,
},
Data: map[string]string{
"enable_debug_server": "true",
"enable_debug_server": strconv.FormatBool(enable),
},
}
_, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}

getErrorOrTimeout(assert, errs, 1*time.Second)
close(stop)
assert.Equal(1, fakeDebugServer.startCount)
assert.Equal(0, fakeDebugServer.stopCount)
assert.True(con.debugServerRunning)
assert.NotNil(con.debugServer)
_, err := kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{})
return err
}

func TestConfigureDebugServerStop(t *testing.T) {
func TestDebugServer(t *testing.T) {
assert := assert.New(t)

// set up a controller
mockCtrl := gomock.NewController(t)
stop := make(chan struct{})
kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, false, stop)
assert.NoError(err)

kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, true, stop)
if err != nil {
t.Fatal(err)
}

fakeDebugServer := FakeDebugServer{0, 0, nil}
con := &controller{
debugServerRunning: true,
debugComponents: mockDebugConfig(mockCtrl),
debugServer: &fakeDebugServer,
}

errs := make(chan error)
go con.configureDebugServer(cfg, errs)

updatedConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: testOSMNamespace,
Name: testOSMConfigMapName,
},
Data: map[string]string{
"enable_debug_server": "false",
},
}
_, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
fakeDebugServer := FakeDebugServer{0, 0, nil, false}
httpserver.RegisterDebugServer(&fakeDebugServer, cfg)

getErrorOrTimeout(assert, errs, 1*time.Second)
close(stop)
assert.Equal(0, fakeDebugServer.startCount)
assert.Equal(1, fakeDebugServer.stopCount)
assert.False(con.debugServerRunning)
assert.Nil(con.debugServer)
}

func TestConfigureDebugServerErr(t *testing.T) {
assert := assert.New(t)

// set up a controller
mockCtrl := gomock.NewController(t)
stop := make(chan struct{})

kubeClient, _, cfg, err := setupComponents(testOSMNamespace, testOSMConfigMapName, true, stop)
if err != nil {
t.Fatal(err)
}

fakeDebugServer := FakeDebugServer{0, 0, errors.Errorf("Debug server error")}
con := &controller{
debugServerRunning: true,
debugComponents: mockDebugConfig(mockCtrl),
debugServer: &fakeDebugServer,
}
errs := make(chan error)
go con.configureDebugServer(cfg, errs)

updatedConfigMap := v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: testOSMNamespace,
Name: testOSMConfigMapName,
},
Data: map[string]string{
"enable_debug_server": "false",
},
}
_, err = kubeClient.CoreV1().ConfigMaps(testOSMNamespace).Update(context.TODO(), &updatedConfigMap, metav1.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
assert.Equal(0, fakeDebugServer.stopCount)

getErrorOrTimeout(assert, errs, 1*time.Second)
close(stop)
assert.Equal(0, fakeDebugServer.startCount)
assert.Equal(1, fakeDebugServer.stopCount)
assert.False(con.debugServerRunning)
assert.NotNil(con.debugServer)
// Start server
err = toggleDebugServer(true, kubeClient)
assert.NoError(err)

assert.Eventually(func() bool {
return fakeDebugServer.running == true
}, 2*time.Second, 100*time.Millisecond)
assert.Eventually(func() bool {
return fakeDebugServer.startCount == 1
}, 2*time.Second, 100*time.Millisecond)
assert.Equal(0, fakeDebugServer.stopCount) // No eventually for an non-expected-to-change value

// Stop it
err = toggleDebugServer(false, kubeClient)
assert.NoError(err)

assert.Eventually(func() bool {
return fakeDebugServer.running == false
}, 2*time.Second, 100*time.Millisecond)
assert.Eventually(func() bool {
return fakeDebugServer.stopCount == 1
}, 2*time.Second, 100*time.Millisecond)
assert.Equal(1, fakeDebugServer.startCount) // No eventually for an non-expected-to-change value
}

func TestCreateCABundleKubernetesSecret(t *testing.T) {
Expand Down Expand Up @@ -210,26 +129,21 @@ type FakeDebugServer struct {
stopCount int
startCount int
stopErr error
running bool
}

func (f *FakeDebugServer) Stop() error {
f.stopCount++
if f.stopErr != nil {
return errors.Errorf("Debug server error")
}
f.running = false
return nil
}

func (f *FakeDebugServer) Start() {
f.startCount++
}

func mockDebugConfig(mockCtrl *gomock.Controller) *debugger.MockDebugServer {
mockDebugConfig := debugger.NewMockDebugServer(mockCtrl)
mockDebugConfig.EXPECT().GetHandlers().Return(map[string]http.Handler{
validRoutePath: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}),
}).AnyTimes()
return mockDebugConfig
f.running = true
}

func setupComponents(namespace, configMapName string, initialDebugServerEnabled bool, stop chan struct{}) (*testclient.Clientset, v1.ConfigMap, configurator.Configurator, error) {
Expand All @@ -252,11 +166,3 @@ func setupComponents(namespace, configMapName string, initialDebugServerEnabled
cfg := configurator.NewConfigurator(kubeClient, stop, namespace, configMapName)
return kubeClient, configMap, cfg, nil
}

func getErrorOrTimeout(assert *assert.Assertions, errs <-chan error, timeout time.Duration) {
select {
case <-errs:
case <-time.After(timeout):
assert.Fail("failed to receive error after " + timeout.String())
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ require (
github.com/Azure/go-autorest/autorest/to v0.3.0
github.com/axw/gocov v1.0.0
github.com/cncf/udpa/go v0.0.0-20200629203442-efcf912fb354 // indirect
github.com/cskr/pubsub v1.0.2
github.com/deckarep/golang-set v1.7.1
github.com/docker/docker v1.4.2-0.20200203170920-46ec8731fbce
github.com/envoyproxy/go-control-plane v0.9.6
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,8 @@ github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:ma
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cskr/pubsub v1.0.2 h1:vlOzMhl6PFn60gRlTQQsIfVwaPB/B/8MziK8FhEPt/0=
github.com/cskr/pubsub v1.0.2/go.mod h1:/8MzYXk/NJAz782G8RPkFzXTZVu63VotefPnR9TIRis=
github.com/cyphar/filepath-securejoin v0.2.2 h1:jCwT2GTP+PY5nBz3c/YL5PAIbusElVrPujOBSCj8xRg=
github.com/cyphar/filepath-securejoin v0.2.2/go.mod h1:FpkQEhXnPnOthhzymB7CGsFk2G9VLXONKD9G7QGMM+4=
github.com/daixiang0/gci v0.2.4 h1:BUCKk5nlK2m+kRIsoj+wb/5hazHvHeZieBKWd9Afa8Q=
Expand Down
18 changes: 17 additions & 1 deletion pkg/configurator/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"

"github.com/cskr/pubsub"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -48,6 +49,9 @@ const (

// serviceCertValidityDurationKey is the key name used to specify the validity duration of service certificates in the ConfigMap
serviceCertValidityDurationKey = "service_cert_validity_duration"

// defaultPubSubChannelSize is the default size of the buffered channel returned to the subscriber
defaultPubSubChannelSize = 128
)

// NewConfigurator implements configurator.Configurator and creates the Kubernetes client to manage namespaces.
Expand All @@ -70,6 +74,7 @@ func newConfigurator(kubeClient kubernetes.Interface, stop <-chan struct{}, osmN
announcements: make(chan a.Announcement),
osmNamespace: osmNamespace,
osmConfigMapName: osmConfigMapName,
pSub: pubsub.New(defaultPubSubChannelSize),
}

informerName := "ConfigMap"
Expand Down Expand Up @@ -128,8 +133,19 @@ type osmConfig struct {
ServiceCertValidityDuration string `yaml:"service_cert_validity_duration"`
}

// This function captures the Announcements from k8s informer updates, and relays them to the subscribed
// members on the pubsub interface
func (c *Client) publish() {
for {
a := <-c.announcements
log.Debug().Msgf("OSM config publish: %s", a.Type.String())
c.pSub.Pub(a, a.Type.String())
}
}

func (c *Client) run(stop <-chan struct{}) {
go c.informer.Run(stop)
go c.publish() // prepare the publish interface
go c.informer.Run(stop) // run the informer synchronization
log.Info().Msgf("Started OSM ConfigMap informer - watching for %s", c.getConfigMapCacheKey())
log.Info().Msg("[ConfigMap Client] Waiting for ConfigMap informer's cache to sync")
if !cache.WaitForCacheSync(stop, c.informer.HasSynced) {
Expand Down
Loading

0 comments on commit 9ea8a6f

Please sign in to comment.