Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

promote to master #43

Merged
merged 12 commits into from
Mar 31, 2019
3 changes: 2 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
language: go

go:
- 1.10.x
# 1.11.x first, so coverave is sent as soon as first build finishes.
- 1.11.x
- 1.10.x
- master

os:
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ Lightning fast, lightweight, simple and fun to develop with. Also easy, very eas
![](https://img.shields.io/badge/performance---42%25-red.svg)
-->

**Website**: [tbd](https://gomicro.services)
**Website**: [gomicro.services](http://gomicro.services)

**Documentation**: [tbd](https://gomicro.services/docs)
**Documentation**: [Docs](http://gomicro.services/docs/)

Status: In development !

Expand Down
16 changes: 10 additions & 6 deletions broker/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,16 @@ func (broker *ServiceBroker) LocalBus() *bus.Emitter {

// stopService stop the service.
func (broker *ServiceBroker) stopService(svc *service.Service) {
broker.middlewares.CallHandlers("serviceStoping", svc)
broker.middlewares.CallHandlers("serviceStopping", svc)
svc.Stop(broker.rootContext.ChildActionContext("service.stop", payload.New(nil)))
broker.middlewares.CallHandlers("serviceStoped", svc)
broker.middlewares.CallHandlers("serviceStopped", svc)
}

// startService start a service.
func (broker *ServiceBroker) startService(svc *service.Service) {

broker.logger.Debug("Broker - startService() - fullname: ", svc.FullName())

broker.middlewares.CallHandlers("serviceStarting", svc)

broker.waitForDependencies(svc)
Expand Down Expand Up @@ -184,6 +186,8 @@ func (broker *ServiceBroker) createBrokerLogger() *log.Entry {

// addService internal addService .. adds one service.Service instance to broker.services list.
func (broker *ServiceBroker) addService(svc *service.Service) {
broker.logger.Debug("Broker - addService() - fullname: ", svc.FullName(), " # actions: ", len(svc.Actions()), " # events: ", len(svc.Events()))

svc.SetNodeID(broker.localNode.GetID())
broker.services = append(broker.services, svc)
if broker.started {
Expand Down Expand Up @@ -229,9 +233,9 @@ func (broker *ServiceBroker) Start() {
}

func (broker *ServiceBroker) Stop() {
broker.logger.Info("Broker -> Stoping...")
broker.logger.Info("Broker -> Stopping...")

broker.middlewares.CallHandlers("brokerStoping", broker.delegates)
broker.middlewares.CallHandlers("brokerStopping", broker.delegates)

for _, service := range broker.services {
broker.stopService(service)
Expand All @@ -240,9 +244,9 @@ func (broker *ServiceBroker) Stop() {
broker.registry.Stop()

broker.started = false
broker.broadcastLocal("$broker.stoped")
broker.broadcastLocal("$broker.stopped")

broker.middlewares.CallHandlers("brokerStoped", broker.delegates)
broker.middlewares.CallHandlers("brokerStopped", broker.delegates)
}

type callPair struct {
Expand Down
10 changes: 5 additions & 5 deletions broker/broker_internals_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,9 +303,9 @@ var _ = Describe("Broker Internals", func() {
time.Sleep(time.Second)
counters.Clear()

Expect(snap.SnapshotMulti("stormBroker-stoped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("stormBroker-stoped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("stormBroker-stoped-soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("stormBroker-stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("stormBroker-stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("stormBroker-stopped-soundsBroker-KnownNodes", soundsBroker.registry.KnownNodes())).Should(Succeed())

aquaBroker.Broadcast("music.tone", "broad< aqua 1 >cast")

Expand All @@ -319,8 +319,8 @@ var _ = Describe("Broker Internals", func() {

counters.Clear()

Expect(snap.SnapshotMulti("soundsBroker-Stoped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("soundsBroker-Stoped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("soundsBroker-Stopped-aquaBroker-KnownNodes", aquaBroker.registry.KnownNodes())).Should(Succeed())
Expect(snap.SnapshotMulti("soundsBroker-Stopped-visualBroker-KnownNodes", visualBroker.registry.KnownNodes())).Should(Succeed())

aquaBroker.Broadcast("music.tone", "broad< aqua 2 >cast")
time.Sleep(time.Second)
Expand Down
2 changes: 1 addition & 1 deletion context/contextFactory.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (context *Context) Emit(eventName string, params interface{}, groups ...str
// Broadcast : Broadcast an event for all local & remote services
func (context *Context) Broadcast(eventName string, params interface{}, groups ...string) {
newContext := context.ChildEventContext(eventName, payload.New(params), groups, true)
context.broker.EmitEvent(newContext)
context.broker.BroadcastEvent(newContext)
}

func (context *Context) ActionName() string {
Expand Down
5 changes: 5 additions & 0 deletions examples/standalone/math.service.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ var mathService = moleculer.Service{
{
Name: "add",
Handler: func(ctx moleculer.Context, params moleculer.Payload) interface{} {

email := "sergio@hotmail.com"
r := <-ctx.Call("profile.search", email)
fmt.Printf(r)

return params.Get("a").Int() + params.Get("b").Int()
},
},
Expand Down
2 changes: 1 addition & 1 deletion middleware/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func Dispatcher(logger *log.Entry) *Dispatch {
return &Dispatch{handlers, logger}
}

var validHandlers = []string{"Config", "brokerStoping", "brokerStoped", "brokerStarting", "brokerStarted", "serviceStoping", "serviceStoped", "serviceStarting", "serviceStarted", "beforeLocalAction", "afterLocalAction", "beforeRemoteAction", "afterRemoteAction"}
var validHandlers = []string{"Config", "brokerStopping", "brokerStopped", "brokerStarting", "brokerStarted", "serviceStopping", "serviceStopped", "serviceStarting", "serviceStarted", "beforeLocalAction", "afterLocalAction", "beforeRemoteAction", "afterRemoteAction"}

// validHandler check if the name of handlers midlewares are tryignt o register exists!
func (dispatch *Dispatch) validHandler(name string) bool {
Expand Down
6 changes: 3 additions & 3 deletions moleculer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type Payload interface {
MapArray() []map[string]interface{}
RawMap() map[string]interface{}
Bson() bson.M
BsonArray() []bson.M
BsonArray() bson.A
Map() map[string]Payload
Exists() bool
IsError() bool
Expand Down Expand Up @@ -132,7 +132,7 @@ type Config struct {
WaitForNeighboursInterval time.Duration
Created func()
Started func()
Stoped func()
Stopped func()
}

var DefaultConfig = Config{
Expand All @@ -151,7 +151,7 @@ var DefaultConfig = Config{
DisableInternalMiddlewares: false,
Created: func() {},
Started: func() {},
Stoped: func() {},
Stopped: func() {},
MaxCallLevel: 100,
RetryPolicy: RetryPolicy{
Enabled: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
(string) (len=5) "level": (int) 2
},
(string) (len=5) "level": (int) 1,
(string) (len=7) "subList": ([]primitive.M) (len=1) {
(string) (len=7) "subList": (primitive.A) (len=1) {
(primitive.M) (len=2) {
(string) (len=4) "name": (string) (len=28) "sub item inside custom array",
(string) (len=5) "level": (int) 3
Expand Down
30 changes: 18 additions & 12 deletions payload/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,20 +347,21 @@ func (rawPayload *RawPayload) RawMap() map[string]interface{} {
return nil
}

// TODO refactor out as a transformer.. just not depend on bson.
func (raw *RawPayload) Bson() bson.M {
valueType := GetValueType(&raw.source)
if valueType == "primitive.M" {
if GetValueType(&raw.source) == "primitive.M" {
return raw.source.(bson.M)
}
if raw.IsMap() {
bm := bson.M{}
raw.ForEach(func(key interface{}, value moleculer.Payload) bool {
skey := key.(string)
if value.IsArray() {
bm[key.(string)] = value.BsonArray()
bm[skey] = value.BsonArray()
} else if value.IsMap() {
bm[key.(string)] = value.Bson()
bm[skey] = value.Bson()
} else {
bm[key.(string)] = value.Value()
bm[skey] = value.Value()
}
return true
})
Expand All @@ -369,18 +370,23 @@ func (raw *RawPayload) Bson() bson.M {
return nil
}

func (raw *RawPayload) BsonArray() []bson.M {
valueType := GetValueType(&raw.source)
if valueType == "[]primitive.M" {
return raw.source.([]bson.M)
func (raw *RawPayload) BsonArray() bson.A {
if GetValueType(&raw.source) == "[]primitive.A" {
return raw.source.(bson.A)
}
if raw.IsArray() {
bm := make([]bson.M, raw.Len())
ba := make(bson.A, raw.Len())
raw.ForEach(func(index interface{}, value moleculer.Payload) bool {
bm[index.(int)] = value.Bson()
if value.IsMap() {
ba[index.(int)] = value.Bson()
} else if value.IsArray() {
ba[index.(int)] = value.BsonArray()
} else {
ba[index.(int)] = value.Value()
}
return true
})
return bm
return ba
}
return nil
}
Expand Down
40 changes: 23 additions & 17 deletions registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ type ServiceRegistry struct {
events *EventCatalog
broker *moleculer.BrokerDelegates
strategy strategy.Strategy
stoping bool
stopping bool
heartbeatFrequency time.Duration
heartbeatTimeout time.Duration
offlineCheckFrequency time.Duration
Expand Down Expand Up @@ -73,7 +73,7 @@ func CreateRegistry(broker *moleculer.BrokerDelegates) *ServiceRegistry {
heartbeatFrequency: config.HeartbeatFrequency,
heartbeatTimeout: config.HeartbeatTimeout,
offlineCheckFrequency: config.OfflineCheckFrequency,
stoping: false,
stopping: false,
nodeReceivedMutex: &sync.Mutex{},
}

Expand Down Expand Up @@ -118,8 +118,8 @@ func (registry *ServiceRegistry) setupMessageHandlers() {
}

func (registry *ServiceRegistry) Stop() {
registry.logger.Debug("Registry Stoping...")
registry.stoping = true
registry.logger.Debug("Registry Stopping...")
registry.stopping = true
<-registry.transit.Disconnect()
registry.logger.Debug("Transit Disconnected -> Registry Full Stop!")

Expand All @@ -132,7 +132,7 @@ func (registry *ServiceRegistry) LocalServices() []*service.Service {
// Start : start the registry background processes.
func (registry *ServiceRegistry) Start() {
registry.logger.Debug("Registry Start() ")
registry.stoping = false
registry.stopping = false
connected := <-registry.transit.Connect()
if !connected {
panic(errors.New("Could not connect to the transit. Check logs for more details."))
Expand Down Expand Up @@ -164,8 +164,8 @@ func (registry *ServiceRegistry) ServiceForAction(name string) *service.Service
func (registry *ServiceRegistry) HandleRemoteEvent(context moleculer.BrokerContext) {
name := context.EventName()
groups := context.Groups()
if registry.stoping {
registry.logger.Error("HandleRemoteEvent() - registry is stoping. Discarding event -> name: ", name, " groups: ", groups)
if registry.stopping {
registry.logger.Error("HandleRemoteEvent() - registry is stopping. Discarding event -> name: ", name, " groups: ", groups)
return
}
broadcast := context.IsBroadcast()
Expand Down Expand Up @@ -282,9 +282,9 @@ func (registry *ServiceRegistry) invokeRemoteAction(context moleculer.BrokerCont
go func() {
actionResult := <-registry.transit.Request(context)
registry.logger.Trace("remote request done! action: ", context.ActionName(), " results: ", actionResult)
if registry.stoping {
registry.logger.Error("invokeRemoteAction() - registry is stoping. Discarding action result -> name: ", context.ActionName())
result <- payload.New(errors.New("can't complete request! registry stoping..."))
if registry.stopping {
registry.logger.Error("invokeRemoteAction() - registry is stopping. Discarding action result -> name: ", context.ActionName())
result <- payload.New(errors.New("can't complete request! registry stopping..."))
} else {
result <- actionResult
}
Expand Down Expand Up @@ -337,10 +337,10 @@ func (registry *ServiceRegistry) checkOfflineNodes() {
}
}

// loopWhileAlive : can the delegate runction in the given frequency and stop whe the registry is stoping
// loopWhileAlive : can the delegate runction in the given frequency and stop whe the registry is stopping
func (registry *ServiceRegistry) loopWhileAlive(frequency time.Duration, delegate func()) {
for {
if registry.stoping {
if registry.stopping {
break
}
delegate()
Expand All @@ -350,8 +350,8 @@ func (registry *ServiceRegistry) loopWhileAlive(frequency time.Duration, delegat

func (registry *ServiceRegistry) filterMessages(handler func(message moleculer.Payload)) func(message moleculer.Payload) {
return func(message moleculer.Payload) {
if registry.stoping {
registry.logger.Warn("filterMessages() - registry is stoping. Discarding message: ", message)
if registry.stopping {
registry.logger.Warn("filterMessages() - registry is stopping. Discarding message: ", message)
return
}
if message.Get("sender").Exists() && message.Get("sender").String() == registry.localNode.GetID() {
Expand Down Expand Up @@ -470,15 +470,21 @@ func (registry *ServiceRegistry) subscribeInternalEvent(event service.Event) {
// it will create endpoints for all service actions.
func (registry *ServiceRegistry) AddLocalService(service *service.Service) {
if registry.services.Find(service.Name(), service.Version(), registry.localNode.GetID()) {
registry.logger.Trace("registry - AddLocalService() - Service already registered, will ignore.. service fullName: ", service.FullName())
return
}
registry.logger.Debug("AddLocalService() nodeID: ", service.NodeID(), " service.fullname: ", service.FullName())

registry.services.Add(service)
for _, action := range service.Actions() {

actions := service.Actions()
events := service.Events()

registry.logger.Debug("registry AddLocalService() nodeID: ", service.NodeID(), " service.fullname: ", service.FullName(), " # actions: ", len(actions), " # events: ", len(events))

for _, action := range actions {
registry.actions.Add(action, service, true)
}
for _, event := range service.Events() {
for _, event := range events {
if strings.Index(event.Name(), "$") == 0 {
registry.subscribeInternalEvent(event)
} else {
Expand Down
2 changes: 1 addition & 1 deletion registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ var _ = Describe("Registry", func() {

Expect(func() {
<-scannerBroker.Call("scanner.scan", scanText)
}).Should(Panic()) //broker B is stoped ... so it should panic
}).Should(Panic()) //broker B is stopped ... so it should panic
})
})
})
14 changes: 10 additions & 4 deletions serializer/jsonSerializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,13 +337,19 @@ func (payload JSONPayload) FloatArray() []float64 {
return nil
}

func (jp JSONPayload) BsonArray() []bson.M {
func (jp JSONPayload) BsonArray() bson.A {
if jp.IsArray() {
bm := make([]bson.M, jp.Len())
ba := make(bson.A, jp.Len())
for index, value := range jp.Array() {
bm[index] = value.Bson()
if value.IsMap() {
ba[index] = value.Bson()
} else if value.IsArray() {
ba[index] = value.BsonArray()
} else {
ba[index] = value.Value()
}
}
return bm
return ba
}
return nil
}
Expand Down
7 changes: 1 addition & 6 deletions service/.snapshots/service-glob--func1-12
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,12 @@
Description: (string) ""
}
},
Events: ([]moleculer.Event) (len=3) {
Events: ([]moleculer.Event) (len=2) {
(moleculer.Event) {
Name: (string) (len=12) "moon.isClose",
Group: (string) "",
Handler: (moleculer.EventHandler) <moleculer.EventHandler Value>
},
(moleculer.Event) {
Name: (string) (len=13) "earth.rotates",
Group: (string) "",
Handler: (moleculer.EventHandler) <moleculer.EventHandler Value>
},
(moleculer.Event) {
Name: (string) (len=13) "earth.rotates",
Group: (string) "",
Expand Down
Loading