Skip to content
This repository has been archived by the owner on Nov 8, 2022. It is now read-only.

Commit

Permalink
Merge pull request #1225 from jcooklin/ib/1222-default-config-values
Browse files Browse the repository at this point in the history
Fixes #1222 and #1228 : Applies config defaults - Metric too ambiguous
  • Loading branch information
jcooklin authored Oct 7, 2016
2 parents a8bbd8d + 92e2486 commit 57e774d
Show file tree
Hide file tree
Showing 13 changed files with 271 additions and 168 deletions.
1 change: 0 additions & 1 deletion control/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -482,7 +482,6 @@ func (p *pluginConfig) getPluginConfigDataNode(pluginType core.PluginType, name
}
}

//todo change to debug
log.WithFields(log.Fields{
"_block_": "getPluginConfigDataNode",
"_module": "config",
Expand Down
12 changes: 12 additions & 0 deletions control/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,12 @@ func (p *pluginControl) getMetricsAndCollectors(requested []core.RequestedMetric
// set config to metric
mt.config = cfg

// apply defaults to the metric that may be present in the plugins
// configpolicy
if pluginCfg := mt.Plugin.ConfigPolicy.Get(mt.Namespace().Strings()); pluginCfg != nil {
mt.config.ApplyDefaults(pluginCfg.Defaults())
}

// loaded plugin which exposes the metric
lp := mt.Plugin
key := lp.Key()
Expand Down Expand Up @@ -911,6 +917,12 @@ func (p *pluginControl) CollectMetrics(id string, allTags map[string]map[string]

// For each available plugin call available plugin using RPC client and wait for response (goroutines)
for pluginKey, pmt := range pluginToMetricMap {
// merge global plugin config into the config for the metric
for _, mt := range pmt.metricTypes {
if mt.Config() != nil {
mt.Config().ReverseMergeInPlace(p.Config.Plugins.getPluginConfigDataNode(core.CollectorPluginType, pmt.plugin.Name(), pmt.plugin.Version()))
}
}

wg.Add(1)

Expand Down
39 changes: 22 additions & 17 deletions control/control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,9 +815,9 @@ func TestMetricConfig(t *testing.T) {
Convey("So metric should be valid with config", func() {
errs := c.subscriptionGroups.validateMetric(m1)
So(errs, ShouldBeNil)
})
Convey("So mock should have name: bob config from defaults", func() {
So(c.Config.Plugins.pluginCache["0"+core.Separator+"mock"+core.Separator+"1"].Table()["name"], ShouldResemble, ctypes.ConfigValueStr{Value: "bob"})
Convey("So mock should have name: bob config from defaults", func() {
So(c.Config.Plugins.pluginCache["0"+core.Separator+"mock"+core.Separator+"1"].Table()["name"], ShouldResemble, ctypes.ConfigValueStr{Value: "bob"})
})
})

c.Stop()
Expand Down Expand Up @@ -1050,10 +1050,10 @@ func TestCollectDynamicMetrics(t *testing.T) {
<-lpe.done
metrics, err := c.metricCatalog.Fetch(core.NewNamespace())
So(err, ShouldBeNil)
So(len(metrics), ShouldEqual, 6)
So(len(metrics), ShouldEqual, 8)
mts, err := c.metricCatalog.GetMetrics(core.NewNamespace("intel", "mock", "*", "baz"), 2)
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 1)
So(len(mts), ShouldEqual, 2)
m := mts[0]
errs := c.subscriptionGroups.validateMetric(m)
So(errs, ShouldBeNil)
Expand Down Expand Up @@ -1096,7 +1096,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
So(err, ShouldBeNil)
So(hits, ShouldEqual, 0)
So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
mts, errs = c.CollectMetrics(taskID, nil)
hits, err = pool.CacheHits(m.namespace.String(), 2, taskID)
So(err, ShouldBeNil)
Expand All @@ -1105,7 +1105,7 @@ func TestCollectDynamicMetrics(t *testing.T) {
// So(hits, ShouldEqual, 1)

So(errs, ShouldBeNil)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
pool.Unsubscribe(taskID)
pool.SelectAndKill(taskID, "unsubscription event")
So(pool.Count(), ShouldEqual, 0)
Expand Down Expand Up @@ -1215,7 +1215,7 @@ func TestCollectMetrics(t *testing.T) {
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 4)
So(len(mts), ShouldEqual, 5)

cd := cdata.NewNode()
cd.AddItem("password", ctypes.ConfigValueStr{Value: "testval"})
Expand Down Expand Up @@ -1309,7 +1309,7 @@ func TestCollectNonSpecifiedDynamicMetrics(t *testing.T) {
<-lpe.done
mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
So(len(mts), ShouldEqual, 4)
So(len(mts), ShouldEqual, 5)

cd := cdata.NewNode()

Expand Down Expand Up @@ -1346,16 +1346,19 @@ func TestCollectNonSpecifiedDynamicMetrics(t *testing.T) {
So(len(mts), ShouldBeGreaterThan, len(requested))
// expected 10 metrics "/intel/mock/[host_id]/baz
// for hosts in range (0 - 9)
So(len(mts), ShouldEqual, 10)
So(len(mts), ShouldEqual, 11)
for _, m := range mts {
// ensure the collected metric's namespace starts with /intel/mock/host...
So(m.Namespace().String(), ShouldStartWith, core.NewNamespace("intel", "mock", "host").String())
So(m.Namespace().String(), ShouldStartWith, core.NewNamespace("intel", "mock").String())
So(m.Namespace().String(), ShouldContainSubstring, "baz")

// ensure the collected data coming back is from v1
So(m.Version(), ShouldEqual, 1)
// ensure the collected data is dynamic
isDynamic, _ := m.Namespace().IsDynamic()
So(isDynamic, ShouldBeTrue)
if !strings.Contains(m.Namespace().String(), "all") {
isDynamic, _ := m.Namespace().IsDynamic()
So(isDynamic, ShouldBeTrue)
}
}
}

Expand Down Expand Up @@ -1389,9 +1392,9 @@ func TestCollectSpecifiedDynamicMetrics(t *testing.T) {

mts, err := c.MetricCatalog()
So(err, ShouldBeNil)
// metric catalog should contain the 3 following metrics:
// /intel/mock/foo; /intel/mock/bar; /intel/mock/*/baz
So(len(mts), ShouldEqual, 3)
// metric catalog should contain the 4 following metrics:
// /intel/mock/foo; /intel/mock/bar; /intel/mock/*/baz; /intel/mock/all/baz
So(len(mts), ShouldEqual, 4)

Convey("collection for specified host id - positive", func() {
taskID := "task-01"
Expand Down Expand Up @@ -1991,7 +1994,9 @@ func TestDynamicMetricSubscriptionLoadLessMetrics(t *testing.T) {
Convey("metrics are collected from mock1 and mock2", func() {
// ensure the data coming back is from mock 1 and mock 2
for _, m := range mts2 {
if strings.Contains(m.Namespace().String(), "host") || strings.Contains(m.Namespace().String(), "bar") {
if strings.Contains(m.Namespace().String(), "host") ||
strings.Contains(m.Namespace().String(), "bar") ||
strings.Contains(m.Namespace().String(), "all") {
val, ok := m.Data().(int)
So(ok, ShouldEqual, true)
So(val, ShouldBeGreaterThan, 1000)
Expand Down
12 changes: 12 additions & 0 deletions control/plugin/cpolicy/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,18 @@ func (c *ConfigPolicyNode) HasRules() bool {
return false
}

// Defaults returns a map[string]ctypes.ConfigValue for all of the rules that
// have defaults.
func (c *ConfigPolicyNode) Defaults() map[string]ctypes.ConfigValue {
defaults := map[string]ctypes.ConfigValue{}
for name, rule := range c.rules {
if def := rule.Default(); def != nil {
defaults[name] = def
}
}
return defaults
}

// Validates and returns a processed policy node or nil and error if validation has failed
func (c *ConfigPolicyNode) Process(m map[string]ctypes.ConfigValue) (*map[string]ctypes.ConfigValue, *ProcessingErrors) {
c.mutex.Lock()
Expand Down
2 changes: 1 addition & 1 deletion control/plugin_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (p *pluginManager) LoadPlugin(details *pluginDetails, emitter gomit.Emitter
}

// Update config policy with defaults
cfgNode.ReverseMerge(defaults)
cfgNode = cfgNode.ReverseMerge(defaults)
cp, err = c.GetConfigPolicy()
if err != nil {
pmLogger.WithFields(log.Fields{
Expand Down
105 changes: 59 additions & 46 deletions control/subscription_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,10 +227,10 @@ func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric,
if err != nil {
return []serror.SnapError{serror.New(err)}
}
plg.Config().ReverseMerge(
mergedConfig := plg.Config().ReverseMerge(
s.Config.Plugins.getPluginConfigDataNode(
typ, plg.Name(), plg.Version()))
errs := s.validatePluginSubscription(plg)
errs := s.validatePluginSubscription(plg, mergedConfig)
if len(errs) > 0 {
serrs = append(serrs, errs...)
return serrs
Expand All @@ -239,7 +239,7 @@ func (s *subscriptionGroups) ValidateDeps(requested []core.RequestedMetric,
return
}

func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin) []serror.SnapError {
func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin, mergedConfig *cdata.ConfigDataNode) []serror.SnapError {
var serrs = []serror.SnapError{}
controlLogger.WithFields(log.Fields{
"_block": "validate-plugin-subscription",
Expand All @@ -259,7 +259,7 @@ func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin

if lp.ConfigPolicy != nil {
ncd := lp.ConfigPolicy.Get([]string{""})
_, errs := ncd.Process(pl.Config().Table())
_, errs := ncd.Process(mergedConfig.Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
se := serror.New(e)
Expand All @@ -273,62 +273,65 @@ func (p *subscriptionGroups) validatePluginSubscription(pl core.SubscribedPlugin

func (s *subscriptionGroups) validateMetric(
metric core.Metric) (serrs []serror.SnapError) {
m, err := s.metricCatalog.GetMetric(metric.Namespace(), metric.Version())
mts, err := s.metricCatalog.GetMetrics(metric.Namespace(), metric.Version())
if err != nil {
serrs = append(serrs, serror.New(err, map[string]interface{}{
"name": metric.Namespace().String(),
"version": metric.Version(),
}))
return serrs
}
for _, m := range mts {

// No metric found return error.
if m == nil {
serrs = append(
serrs, serror.New(
fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)",
metric.Namespace(), metric.Version())))
return serrs
}
// No metric found return error.
if m == nil {
serrs = append(
serrs, serror.New(
fmt.Errorf("no metric found cannot subscribe: (%s) version(%d)",
metric.Namespace(), metric.Version())))
continue
}

m.config = metric.Config()
m.config = metric.Config()

typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
return []serror.SnapError{serror.New(err)}
}
typ, serr := core.ToPluginType(m.Plugin.TypeName())
if serr != nil {
serrs = append(serrs, serror.New(err))
continue
}

// merge global plugin config
if m.config != nil {
m.config.ReverseMerge(
s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version()))
} else {
m.config = s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version())
}
// merge global plugin config
if m.config != nil {
m.config.ReverseMergeInPlace(
s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version()))
} else {
m.config = s.Config.Plugins.getPluginConfigDataNode(typ,
m.Plugin.Name(), m.Plugin.Version())
}

// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if m.policy.HasRules() {
if m.Config() == nil {
fields := log.Fields{
"metric": m.Namespace(),
"version": m.Version(),
"plugin": m.Plugin.Name(),
// When a metric is added to the MetricCatalog, the policy of rules defined by the plugin is added to the metric's policy.
// If no rules are defined for a metric, we set the metric's policy to an empty ConfigPolicyNode.
// Checking m.policy for nil will not work, we need to check if rules are nil.
if m.policy.HasRules() {
if m.Config() == nil {
fields := log.Fields{
"metric": m.Namespace(),
"version": m.Version(),
"plugin": m.Plugin.Name(),
}
serrs = append(serrs, serror.New(ErrConfigRequiredForMetric, fields))
continue
}
serrs = append(serrs, serror.New(ErrConfigRequiredForMetric, fields))
return serrs
}
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
ncdTable, errs := m.policy.Process(m.Config().Table())
if errs != nil && errs.HasErrors() {
for _, e := range errs.Errors() {
serrs = append(serrs, serror.New(e))
}
continue
}
return serrs
m.config = cdata.FromTable(*ncdTable)
}
m.config = cdata.FromTable(*ncdTable)
}

return serrs
Expand All @@ -342,10 +345,20 @@ func (s *subscriptionGroup) process(id string) (serrs []serror.SnapError) {
"metrics": fmt.Sprintf("%+v", s.requestedMetrics),
}).Debug("gathered collectors")

//add processors and publishers to collectors just gathered
for _, plugin := range s.requestedPlugins {
//add processors and publishers to collectors just gathered
if plugin.TypeName() != core.CollectorPluginType.String() {
plugins = append(plugins, plugin)
// add defaults to plugins (exposed in a plugins ConfigPolicy)
if lp, err := s.pluginManager.get(
fmt.Sprintf("%s:%s:%d",
plugin.TypeName(),
plugin.Name(),
plugin.Version())); err == nil && lp.ConfigPolicy != nil {
if policy := lp.ConfigPolicy.Get([]string{""}); policy != nil && len(policy.Defaults()) > 0 {
plugin.Config().ApplyDefaults(policy.Defaults())
}
}
}
}

Expand Down
Loading

0 comments on commit 57e774d

Please sign in to comment.