From 6288d66aa4adfa1234bb495d6f8af7da2b6b31f7 Mon Sep 17 00:00:00 2001 From: Joel Cooklin Date: Thu, 14 Jul 2016 16:28:05 -0700 Subject: [PATCH] Adds event manager to tribe --- core/tribe_event/tribe_event.go | 41 +++++++++++++++++++++++++++++++++ mgmt/rest/tribe_test.go | 40 +++++++++++++++++++++++++------- mgmt/tribe/tribe.go | 11 +++++++++ 3 files changed, 84 insertions(+), 8 deletions(-) create mode 100644 core/tribe_event/tribe_event.go diff --git a/core/tribe_event/tribe_event.go b/core/tribe_event/tribe_event.go new file mode 100644 index 000000000..5997afd1c --- /dev/null +++ b/core/tribe_event/tribe_event.go @@ -0,0 +1,41 @@ +/* +http://www.apache.org/licenses/LICENSE-2.0.txt + + +Copyright 2015 Intel Corporation + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tribe_event + +import "github.com/intelsdi-x/snap/core" + +const ( + PluginAdded = "Tribe.PluginAdded" +) + +type AddPluginEvent struct { + Agreement struct { + Name string + } + Plugin struct { + Name string + Type core.PluginType + Version int + } +} + +func (e AddPluginEvent) Namespace() string { + return PluginAdded +} diff --git a/mgmt/rest/tribe_test.go b/mgmt/rest/tribe_test.go index ba434befa..fc33d097f 100644 --- a/mgmt/rest/tribe_test.go +++ b/mgmt/rest/tribe_test.go @@ -35,8 +35,10 @@ import ( log "github.com/Sirupsen/logrus" . "github.com/smartystreets/goconvey/convey" + "github.com/intelsdi-x/gomit" "github.com/intelsdi-x/snap/control" "github.com/intelsdi-x/snap/core" + "github.com/intelsdi-x/snap/core/tribe_event" "github.com/intelsdi-x/snap/mgmt/rest/rbody" "github.com/intelsdi-x/snap/mgmt/tribe" "github.com/intelsdi-x/snap/scheduler" @@ -151,7 +153,7 @@ func TestTribeTaskAgreements(t *testing.T) { log.SetLevel(log.WarnLevel) numOfNodes := 5 aName := "agreement99" - mgtPorts, tribePort := startTribes(numOfNodes, "") + mgtPorts, tribePort, lpe := startTribes(numOfNodes, "") Convey("A cluster is started", t, func() { Convey("Members are retrieved", func() { for _, i := range mgtPorts { @@ -206,6 +208,7 @@ func TestTribeTaskAgreements(t *testing.T) { So(resp.Meta.Code, ShouldEqual, 200) So(len(resp.Body.(*rbody.PluginList).LoadedPlugins), ShouldEqual, 1) pluginToUnload := resp.Body.(*rbody.PluginList).LoadedPlugins[0] + <-lpe.pluginAddEvent resp = getAgreement(mgtPorts[0], aName) So(resp.Meta.Code, ShouldEqual, 200) So(len(resp.Body.(*rbody.TribeGetAgreement).Agreement.PluginAgreement.Plugins), ShouldEqual, 1) @@ -278,7 +281,7 @@ func TestTribeTaskAgreements(t *testing.T) { wg.Wait() So(timedOut, ShouldEqual, false) Convey("A new node joins the agreement", func() { - mgtPort, _ := startTribes(1, fmt.Sprintf("127.0.0.1:%d", tribePort)) + mgtPort, _, _ := startTribes(1, fmt.Sprintf("127.0.0.1:%d", tribePort)) j := joinAgreement(mgtPort[0], fmt.Sprintf("member-%d", mgtPort[0]), aName) mgtPorts = append(mgtPorts, mgtPort[0]) So(j.Meta.Code, ShouldEqual, 200) @@ -429,7 +432,7 @@ func TestTribePluginAgreements(t *testing.T) { ) numOfNodes := 5 aName := "agreement1" - mgtPorts, _ := startTribes(numOfNodes, "") + mgtPorts, _, _ := startTribes(numOfNodes, "") Convey("A cluster is started", t, func() { Convey("Members are retrieved", func() { for _, i := range mgtPorts { @@ -663,11 +666,29 @@ func TestTribePluginAgreements(t *testing.T) { }) } +type listenToSeedEvents struct { + pluginAddEvent chan struct{} +} + +func newListenToSeedEvents() *listenToSeedEvents { + return &listenToSeedEvents{ + pluginAddEvent: make(chan struct{}), + } +} + +func (l *listenToSeedEvents) HandleGomitEvent(e gomit.Event) { + switch e.Body.(type) { + case *tribe_event.AddPluginEvent: + l.pluginAddEvent <- struct{}{} + } +} + // returns an array of the mgtports and the tribe port for the last node -func startTribes(count int, seed string) ([]int, int) { +func startTribes(count int, seed string) ([]int, int, *listenToSeedEvents) { var wg sync.WaitGroup var tribePort int var mgtPorts []int + lpe := newListenToSeedEvents() for i := 0; i < count; i++ { mgtPort := getAvailablePort() mgtPorts = append(mgtPorts, mgtPort) @@ -680,14 +701,17 @@ func startTribes(count int, seed string) ([]int, int) { conf.RestAPIPort = mgtPort //conf.MemberlistConfig.PushPullInterval = 5 * time.Second conf.MemberlistConfig.RetransmitMult = conf.MemberlistConfig.RetransmitMult * 2 - if seed == "" { - seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort) - } + t, err := tribe.New(conf) if err != nil { panic(err) } + if seed == "" { + seed = fmt.Sprintf("%s:%d", "127.0.0.1", tribePort) + t.EventManager.RegisterHandler("tribe.tests", lpe) + } + c := control.New(control.GetDefaultConfig()) c.RegisterEventHandler("tribe", t) c.Start() @@ -724,7 +748,7 @@ func startTribes(count int, seed string) ([]int, int) { }(mgtPort) } wg.Wait() - return mgtPorts, tribePort + return mgtPorts, tribePort, lpe } var nextPort uint64 = 55234 diff --git a/mgmt/tribe/tribe.go b/mgmt/tribe/tribe.go index 17368a9c4..6dd12a041 100644 --- a/mgmt/tribe/tribe.go +++ b/mgmt/tribe/tribe.go @@ -35,6 +35,7 @@ import ( "github.com/intelsdi-x/snap/core/control_event" "github.com/intelsdi-x/snap/core/scheduler_event" "github.com/intelsdi-x/snap/core/serror" + "github.com/intelsdi-x/snap/core/tribe_event" "github.com/intelsdi-x/snap/mgmt/tribe/agreement" "github.com/intelsdi-x/snap/mgmt/tribe/worker" "github.com/pborman/uuid" @@ -78,6 +79,7 @@ type tribe struct { taskStateResponses map[string]*taskStateQueryResponse members map[string]*agreement.Member tags map[string]string + EventManager *gomit.EventController config *Config pluginCatalog worker.ManagesPlugins @@ -117,6 +119,7 @@ func New(cfg *Config) (*tribe, error) { workerQuitChan: make(chan struct{}), workerWaitGroup: &sync.WaitGroup{}, config: cfg, + EventManager: gomit.NewEventController(), } tribe.broadcasts = &memberlist.TransmitLimitedQueue{ @@ -504,6 +507,14 @@ func (t *tribe) AddPlugin(agreementName string, p agreement.Plugin) error { UUID: uuid.New(), Type: addPluginMsgType, } + defer t.EventManager.Emit(&tribe_event.AddPluginEvent{ + Agreement: struct{ Name string }{agreementName}, + Plugin: struct { + Name string + Type core.PluginType + Version int + }{Name: p.Name(), Type: p.Type_, Version: p.Version_}, + }) if t.handleAddPlugin(msg) { t.broadcast(addPluginMsgType, msg, nil) }