diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 1d2467d2643..c017a21c5b7 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -68,6 +68,7 @@ https://github.com/elastic/beats/compare/v6.4.0...master[Check the HEAD diff] - Do not panic when no tokenizer string is configured for a dissect processor. {issue}8895[8895] - Start autodiscover consumers before producers. {pull}7926[7926] - The setup command will not fail if no dashboard is available to import. {pull}8977[8977] +- Fix central management configurations reload when a configuration is removed in Kibana. {issue}9010[9010] *Auditbeat* diff --git a/libbeat/common/reload/reload.go b/libbeat/common/reload/reload.go index 1f8b3ab4a73..0e6933889e3 100644 --- a/libbeat/common/reload/reload.go +++ b/libbeat/common/reload/reload.go @@ -110,6 +110,23 @@ func (r *Registry) MustRegisterList(name string, list ReloadableList) { } } +// GetRegisteredNames returns the list of names registered +func (r *Registry) GetRegisteredNames() []string { + r.RLock() + defer r.RUnlock() + var names []string + + for name := range r.confs { + names = append(names, name) + } + + for name := range r.confsLists { + names = append(names, name) + } + + return names +} + // GetReloadable returns the reloadable object with the given name, nil if not found func (r *Registry) GetReloadable(name string) Reloadable { r.RLock() diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 908f5619fdd..fb811893aa5 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -153,8 +153,10 @@ func makeWorkQueue() workQueue { func (c *outputController) Reload(cfg *reload.ConfigWithMeta) error { outputCfg := common.ConfigNamespace{} - if err := cfg.Config.Unpack(&outputCfg); err != nil { - return err + if cfg != nil { + if err := cfg.Config.Unpack(&outputCfg); err != nil { + return err + } } output, err := loadOutput(c.beat, c.monitors, outputCfg) diff --git a/x-pack/libbeat/management/manager.go b/x-pack/libbeat/management/manager.go index 084418b2db5..9c494fd8996 100644 --- a/x-pack/libbeat/management/manager.go +++ b/x-pack/libbeat/management/manager.go @@ -181,9 +181,24 @@ func (cm *ConfigManager) fetch() bool { func (cm *ConfigManager) apply() { configOK := true + + missing := map[string]bool{} + for _, name := range cm.registry.GetRegisteredNames() { + missing[name] = true + } + + // Reload configs for _, b := range cm.cache.Configs { err := cm.reload(b.Type, b.Blocks) configOK = configOK && err == nil + missing[b.Type] = false + } + + // Unset missing configs + for name := range missing { + if missing[name] { + cm.reload(name, []*api.ConfigBlock{}) + } } if !configOK { @@ -199,15 +214,20 @@ func (cm *ConfigManager) reload(t string, blocks []*api.ConfigBlock) error { if obj := cm.registry.GetReloadable(t); obj != nil { // Single object - if len(blocks) != 1 { + if len(blocks) > 1 { err := fmt.Errorf("got an invalid number of configs for %s: %d, expected: 1", t, len(blocks)) cm.logger.Error(err) return err } - config, err := blocks[0].ConfigWithMeta() - if err != nil { - cm.logger.Error(err) - return err + + var config *reload.ConfigWithMeta + var err error + if len(blocks) == 1 { + config, err = blocks[0].ConfigWithMeta() + if err != nil { + cm.logger.Error(err) + return err + } } if err := obj.Reload(config); err != nil { diff --git a/x-pack/libbeat/management/manager_test.go b/x-pack/libbeat/management/manager_test.go index a7985299cb1..39c3a1f9e51 100644 --- a/x-pack/libbeat/management/manager_test.go +++ b/x-pack/libbeat/management/manager_test.go @@ -100,3 +100,68 @@ func TestConfigManager(t *testing.T) { manager.Stop() os.Remove(paths.Resolve(paths.Data, "management.yml")) } + +func TestRemoveItems(t *testing.T) { + registry := reload.NewRegistry() + id, err := uuid.NewV4() + if err != nil { + t.Fatalf("error while generating id: %v", err) + } + accessToken := "footoken" + reloadable := reloadable{ + reloaded: make(chan *reload.ConfigWithMeta, 1), + } + registry.MustRegister("test.blocks", &reloadable) + + mux := http.NewServeMux() + i := 0 + responses := []string{ + // Initial load + `{"configuration_blocks":[{"type":"test.blocks","config":{"module":"apache2"}}]}`, + + // Return no blocks + `{"configuration_blocks":[]}`, + } + mux.Handle("/", http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, responses[i]) + i++ + })) + + server := httptest.NewServer(mux) + + c, err := api.ConfigFromURL(server.URL) + if err != nil { + t.Fatal(err) + } + + config := &Config{ + Enabled: true, + Period: 100 * time.Millisecond, + Kibana: c, + AccessToken: accessToken, + } + + manager, err := NewConfigManagerWithConfig(config, registry, id) + if err != nil { + t.Fatal(err) + } + + manager.Start() + + // On first reload we will get apache2 module + config1 := <-reloadable.reloaded + assert.Equal(t, &reload.ConfigWithMeta{ + Config: common.MustNewConfigFrom(map[string]interface{}{ + "module": "apache2", + }), + }, config1) + + // Get a nil config, even if the block is not part of the payload + config2 := <-reloadable.reloaded + var nilConfig *reload.ConfigWithMeta + assert.Equal(t, nilConfig, config2) + + // Cleanup + manager.Stop() + os.Remove(paths.Resolve(paths.Data, "management.yml")) +}