Skip to content

Commit

Permalink
feat(command): implement MessagingRouter
Browse files Browse the repository at this point in the history
Save active requests in two maps using requestId as keys.
This allows command service to know where to route the response
(to external MQTT or internal MessageBus)

Signed-off-by: Chris Hung <chris@iotechsys.com>
  • Loading branch information
Chris Hung committed Sep 30, 2022
1 parent 2588d96 commit 91c61b8
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 35 deletions.
10 changes: 6 additions & 4 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const (
ResponseTopic = "ResponseTopic"
)

func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
func OnConnectHandler(router MessagingRouter, dic *di.Container) mqtt.OnConnectHandler {
return func(client mqtt.Client) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
config := container.ConfigurationFrom(dic.Get)
Expand All @@ -50,7 +50,7 @@ func OnConnectHandler(dic *di.Container) mqtt.OnConnectHandler {
}

requestCommandTopic := externalTopics[RequestCommandTopic]
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(dic)); token.Wait() && token.Error() != nil {
if token := client.Subscribe(requestCommandTopic, qos, commandRequestHandler(router, dic)); token.Wait() && token.Error() != nil {
lc.Errorf("could not subscribe to topic '%s': %s", responseQueryTopic, token.Error().Error())
return
}
Expand Down Expand Up @@ -134,7 +134,7 @@ func commandQueryHandler(responseTopic string, qos byte, retain bool, dic *di.Co
}
}

func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
func commandRequestHandler(router MessagingRouter, dic *di.Container) mqtt.MessageHandler {
return func(client mqtt.Client, message mqtt.Message) {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
Expand All @@ -151,7 +151,7 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
// expected command request topic scheme: #/<device>/<command-name>/<method>
topicLevels := strings.Split(message.Topic(), "/")
length := len(topicLevels)
if length <= 3 {
if length < 3 {
lc.Error("Failed to parse and construct response topic scheme, expected request topic scheme: '#/<device-name>/<command-name>/<method>")
lc.Warn("Not publishing error message back due to insufficient information on response topic")
return
Expand Down Expand Up @@ -207,6 +207,8 @@ func commandRequestHandler(dic *di.Container) mqtt.MessageHandler {
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
return
}

router.SetResponseTopic(requestEnvelope.RequestID, externalResponseTopic, true)
}
}

Expand Down
29 changes: 16 additions & 13 deletions internal/core/command/controller/messaging/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ import (
commonDTO "github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/common"
"github.com/edgexfoundry/go-mod-core-contracts/v2/dtos/responses"
edgexErr "github.com/edgexfoundry/go-mod-core-contracts/v2/errors"
"github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
internalMessagingMocks "github.com/edgexfoundry/go-mod-messaging/v2/messaging/mocks"
"github.com/edgexfoundry/go-mod-messaging/v2/pkg/types"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/edgexfoundry/edgex-go/internal/core/command/config"
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
mqttMocks "github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
"github.com/edgexfoundry/edgex-go/internal/core/command/controller/messaging/mocks"
)

const (
Expand All @@ -57,6 +57,7 @@ const (
)

func TestOnConnectHandler(t *testing.T) {
mockRouter := &mocks.MessagingRouter{}
lc := &lcMocks.LoggingClient{}
lc.On("Errorf", mock.Anything, mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
Expand Down Expand Up @@ -91,19 +92,19 @@ func TestOnConnectHandler(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
if tt.expectedSucceed {
token.On("Error").Return(nil)
} else {
token.On("Error").Return(errors.New("error"))
}

client := &mqttMocks.Client{}
client := &mocks.Client{}
client.On("Subscribe", testQueryRequestTopic, byte(0), mock.Anything).Return(token)
client.On("Subscribe", testExternalCommandRequestTopic, byte(0), mock.Anything).Return(token)

fn := OnConnectHandler(dic)
fn := OnConnectHandler(mockRouter, dic)
fn(client)

if tt.expectedSucceed {
Expand Down Expand Up @@ -205,15 +206,15 @@ func Test_commandQueryHandler(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mqttMocks.Message{}
message := &mocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.requestQueryTopic)

token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

mqttClient := &mqttMocks.Client{}
mqttClient := &mocks.Client{}
mqttClient.On("Publish", testQueryResponseTopic, byte(0), true, mock.Anything).Return(token)

fn := commandQueryHandler(testQueryResponseTopic, 0, true, dic)
Expand Down Expand Up @@ -262,6 +263,8 @@ func Test_commandRequestHandler(t *testing.T) {
},
}

mockRouter := &mocks.MessagingRouter{}
mockRouter.On("SetResponseTopic", mock.Anything, mock.Anything, mock.Anything).Return(nil)
lc := &lcMocks.LoggingClient{}
lc.On("Error", mock.Anything).Return(nil)
lc.On("Errorf", mock.Anything, mock.Anything).Return(nil)
Expand All @@ -274,7 +277,7 @@ func Test_commandRequestHandler(t *testing.T) {
dsc := &clientMocks.DeviceServiceClient{}
dsc.On("DeviceServiceByName", context.Background(), testDeviceServiceName).Return(deviceServiceResponse, nil)
dsc.On("DeviceServiceByName", context.Background(), unknownService).Return(responses.DeviceServiceResponse{}, edgexErr.NewCommonEdgeX(edgexErr.KindEntityDoesNotExist, "unknown device service", nil))
client := &mocks.MessageClient{}
client := &internalMessagingMocks.MessageClient{}
client.On("Publish", mock.Anything, mock.Anything).Return(nil)
dic := di.NewContainer(di.ServiceConstructorMap{
container.ConfigurationName: func(get di.Get) interface{} {
Expand Down Expand Up @@ -338,18 +341,18 @@ func Test_commandRequestHandler(t *testing.T) {
payloadBytes, err := json.Marshal(tt.payload)
require.NoError(t, err)

message := &mqttMocks.Message{}
message := &mocks.Message{}
message.On("Payload").Return(payloadBytes)
message.On("Topic").Return(tt.commandRequestTopic)

token := &mqttMocks.Token{}
token := &mocks.Token{}
token.On("Wait").Return(true)
token.On("Error").Return(nil)

mqttClient := &mqttMocks.Client{}
mqttClient := &mocks.Client{}
mqttClient.On("Publish", mock.Anything, byte(0), true, mock.Anything).Return(token)

fn := commandRequestHandler(dic)
fn := commandRequestHandler(mockRouter, dic)
fn(mqttClient, message)
if tt.expectedError {
if tt.expectedPublishError {
Expand Down
25 changes: 9 additions & 16 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package messaging

import (
"context"
"strings"

bootstrapContainer "github.com/edgexfoundry/go-mod-bootstrap/v2/bootstrap/container"
"github.com/edgexfoundry/go-mod-bootstrap/v2/di"
Expand All @@ -17,11 +16,10 @@ import (
"github.com/edgexfoundry/edgex-go/internal/core/command/container"
)

func SubscribeCommandResponses(ctx context.Context, dic *di.Container) errors.EdgeX {
func SubscribeCommandResponses(ctx context.Context, router MessagingRouter, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusInfo := container.ConfigurationFrom(dic.Get).MessageQueue
internalResponseTopic := messageBusInfo.Internal.Topics[ResponseTopic]
externalResponseTopicPrefix := messageBusInfo.External.Topics[ResponseCommandTopicPrefix]

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
Expand All @@ -47,25 +45,20 @@ func SubscribeCommandResponses(ctx context.Context, dic *di.Container) errors.Ed
case <-ctx.Done():
lc.Infof("Exiting waiting for MessageBus '%s' topic messages", internalResponseTopic)
return
case err := <-messageErrors:
case err = <-messageErrors:
lc.Error(err.Error())
case msgEnvelope := <-messages:
lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", internalResponseTopic, msgEnvelope.CorrelationID)
lc.Debugf("Command response received on message queue. Topic: %s, Correlation-id: %s ", msgEnvelope.ReceivedTopic, msgEnvelope.CorrelationID)

// expected internal command response topic scheme: #/<service-name>/<device-name>/<command-name>/<method>
topicLevels := strings.Split(msgEnvelope.ReceivedTopic, "/")
length := len(topicLevels)
if length < 4 {
lc.Error("Failed to parse and construct command response topic scheme, expected request topic scheme: '#/<service-name>/<device-name>/<command-name>/<method>'")
responseTopic, external, err := router.ResponseTopic(msgEnvelope.RequestID)
if err != nil {
lc.Errorf("Received RequestEnvelope with unknown RequestId %s", msgEnvelope.RequestID)
continue
}

// expected external command response topic scheme: #/<device-name>/<command-name>/<method>
deviceName := topicLevels[length-3]
commandName := topicLevels[length-2]
method := topicLevels[length-1]
externalResponseTopic := strings.Join([]string{externalResponseTopicPrefix, deviceName, commandName, method}, "/")
publishMessage(externalMQTT, externalResponseTopic, qos, retain, msgEnvelope, lc)
if external {
publishMessage(externalMQTT, responseTopic, qos, retain, msgEnvelope, lc)
}
}
}
}()
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

65 changes: 65 additions & 0 deletions internal/core/command/controller/messaging/router.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
//
// Copyright (C) 2022 IOTech Ltd
//
// SPDX-License-Identifier: Apache-2.0

package messaging

import (
"errors"
"sync"
)

// MessagingRouter defines interface for Command Service to know
// where to route the receiving device command response.
type MessagingRouter interface {
// ResponseTopic returns the responseTopicPrefix by requestId, and a boolean value
// indicates its original source(external MQTT or internal MessageBus).
ResponseTopic(requestId string) (string, bool, error)
// SetResponseTopic sets the responseTopicPrefix with RequestId as the key
SetResponseTopic(requestId string, topic string, external bool)
}

func NewMessagingRouter() MessagingRouter {
return &router{
internalCommandRequestMap: make(map[string]string),
externalCommandRequestMap: make(map[string]string),
}
}

type router struct {
mutex sync.Mutex
internalCommandRequestMap map[string]string
externalCommandRequestMap map[string]string
}

func (r *router) ResponseTopic(requestId string) (string, bool, error) {
r.mutex.Lock()
defer r.mutex.Unlock()

topic, ok := r.externalCommandRequestMap[requestId]
if ok {
delete(r.externalCommandRequestMap, requestId)
return topic, true, nil
}

topic, ok = r.internalCommandRequestMap[requestId]
if ok {
delete(r.internalCommandRequestMap, requestId)
return topic, false, nil
}

return "", false, errors.New("requestId not found")
}

func (r *router) SetResponseTopic(requestId string, topic string, external bool) {
r.mutex.Lock()
defer r.mutex.Unlock()

if external {
r.externalCommandRequestMap[requestId] = topic
return
}

r.internalCommandRequestMap[requestId] = topic
}
5 changes: 3 additions & 2 deletions internal/core/command/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,14 @@ func Main(ctx context.Context, cancel context.CancelFunc, router *mux.Router) {
func MessageBusBootstrapHandler(ctx context.Context, wg *sync.WaitGroup, startupTimer startup.Timer, dic *di.Container) bool {
configuration := container.ConfigurationFrom(dic.Get)
if configuration.MessageQueue.Required {
router := messaging.NewMessagingRouter()
if !handlers.MessagingBootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if !handlers.NewExternalMQTT(messaging.OnConnectHandler(dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
if !handlers.NewExternalMQTT(messaging.OnConnectHandler(router, dic)).BootstrapHandler(ctx, wg, startupTimer, dic) {
return false
}
if err := messaging.SubscribeCommandResponses(ctx, dic); err != nil {
if err := messaging.SubscribeCommandResponses(ctx, router, dic); err != nil {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
lc.Errorf("Failed to subscribe commands from message bus, %v", err)
return false
Expand Down

0 comments on commit 91c61b8

Please sign in to comment.