Skip to content

refactor!: Replace internal topics from config with new constants #4348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions cmd/core-command/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
[Service]
Port = 59882
StartupMsg = "This is the Core Command Microservice"
RequestTimeout = "45s"

[Clients]
[Clients.core-metadata]
Expand All @@ -22,11 +21,6 @@ RequestTimeout = "45s"
Port = 59881

[MessageBus]
[MessageBus.Topics]
DeviceCommandRequestTopicPrefix = "edgex/device/command/request" # for publishing requests to the device service; <device-service>/<device-name>/<command-name>/<method> will be added to this publish topic prefix
CommandRequestTopic = "edgex/core/command/request/#" # for subscribing to internal command requests
CommandQueryRequestTopic = "edgex/core/commandquery/request/#" # for subscribing to internal command query requests
ResponseTopicPrefix="edgex/response" # for subscribing/publishing internal responses (used by MessageBus Request API)
[MessageBus.Optional]
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
ClientId ="core-command"
Expand Down
2 changes: 1 addition & 1 deletion cmd/core-common-config-bootstrapper/res/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ all-services:

Telemetry:
Interval: "30s"
PublishTopicPrefix : "edgex/telemetry" # /<service-name>/<metric-name> will be added to this Publish Topic prefix
Metrics:
# Common Security Service Metrics
SecuritySecretsRequested: false
Expand Down Expand Up @@ -53,6 +52,7 @@ all-services:
Type: "redis"
AuthMode: "usernamepassword" # required for redis MessageBus (secure or insecure).
SecretName: "redisdb"
BaseTopicPrefix: "edgex" # prepended to all topics as "edgex/<additional topic levels>
Optional:
# Default MQTT Specific options that need to be here to enable environment variable overrides of them
Qos: "0" # Quality of Service values are 0 (At most once), 1 (At least once) or 2 (Exactly once)
Expand Down
3 changes: 0 additions & 3 deletions cmd/core-data/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,5 @@ StartupMsg = "This is the Core Data Microservice"
Name = "coredata"

[MessageBus]
[MessageBus.Topics]
PublishTopicPrefix = "edgex/events/core" # /<device-profile-name>/<device-name> will be added to this Publish Topic prefix
SubscribeTopic = "edgex/events/device/#" # required for subscribing to Events from MessageBus
[MessageBus.Optional]
ClientId = "core-data"
2 changes: 0 additions & 2 deletions cmd/core-metadata/res/configuration.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ UoMFile = "./res/uom.toml"
Name = "metadata"

[MessageBus]
[MessageBus.Topics]
PublishTopicPrefix = "edgex/system-events" # /<source>/<type>/<action>/<owner>/<profile> will be added to this Publish Topic prefix
[MessageBus.Optional]
# Default MQTT Specific options that need to be here to enable evnironment variable overrides of them
ClientId ="core-metadata"
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ module github.com/edgexfoundry/edgex-go

require (
github.com/eclipse/paho.mqtt.golang v1.4.2
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.20
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.24
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7
github.com/fxamacker/cbor/v2 v2.4.0
github.com/golang-jwt/jwt/v4 v4.4.3
Expand Down Expand Up @@ -50,10 +50,10 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/mitchellh/consulstructure v0.0.0-20190329231841-56fdc4d2da54 // indirect
github.com/mitchellh/copystructure v1.0.0 // indirect
github.com/mitchellh/copystructure v1.2.0 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/nats-io/nats.go v1.20.0 // indirect
github.com/nats-io/nkeys v0.3.0 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
Expand Down
20 changes: 10 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.20 h1:jX540Y6p7XIzxP+DYJkW26g32fAqqR/miwvQDPT7oAI=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.20/go.mod h1:qL+ZN99O1nK+vzsmWQefnXKEifkHSkxtQTSB0z4BYqs=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.24 h1:H9MC0ahbkMw4w1SHeX6/GVR5tC6E+Bz6ZiPECmnzO3c=
github.com/edgexfoundry/go-mod-bootstrap/v3 v3.0.0-dev.24/go.mod h1:iv/czxi4ciFWMgrO+3nnanGfkT2X1QW5L3iCb+deewk=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3 h1:0Ew4PzLSFJ+sb7AYtvb9m1mRN45Sh0ELU1HdMCel5t8=
github.com/edgexfoundry/go-mod-configuration/v3 v3.0.0-dev.3/go.mod h1:ESOWI4GokQfQ3Bn2hGsdfOVx5idj7QEdCPT/SAQDd9M=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10 h1:o5yenvmLn8+0AOz0d5GIek011Tt5ZRbvPlgE4VhozEU=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.10/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7 h1:jgDQA/7SENURXQkIX11pNgA/pX9IK9ZULenj/vF17Vw=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.7/go.mod h1:r6Klfz+QBDx1Z5UV0z70MKdK2/cgHwhtqTm2HFXoWug=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14 h1:o7CFEIyKn/quin5lrAlUbUu9x1dnecK0tZs5waLhdCc=
github.com/edgexfoundry/go-mod-core-contracts/v3 v3.0.0-dev.14/go.mod h1:4lpZUM54ZareGU/yuAJvLEw0BoJ43SvCj1LO+gsKm9c=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9 h1:CUUieXQ8roD4M770GXj1he707V3V9Jiygk302+dwvKk=
github.com/edgexfoundry/go-mod-messaging/v3 v3.0.0-dev.9/go.mod h1:iKBxmZkc7jdOrT99+IR1nyg7PlRgooAQMhZxDh2mTUQ=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3 h1:QgZF9f70Cwpvkjw3tP1aiVGHc+yNFJNzW6hO8pDs3fg=
github.com/edgexfoundry/go-mod-registry/v3 v3.0.0-dev.3/go.mod h1:2w8v0sv+i21nY+DY6JV4PFxsNTuxpGAjlNFlFMTfZkk=
github.com/edgexfoundry/go-mod-secrets/v3 v3.0.0-dev.7 h1:9Mn389IHlgoPgGUpwnLzoe/shylkQI+nXI3E/AfnYDA=
Expand Down Expand Up @@ -171,17 +171,17 @@ github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceT
github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI=
github.com/mitchellh/consulstructure v0.0.0-20190329231841-56fdc4d2da54 h1:DcITQwl3ymmg7i1XfwpZFs/TPv2PuTwxE8bnuKVtKlk=
github.com/mitchellh/consulstructure v0.0.0-20190329231841-56fdc4d2da54/go.mod h1:dIfpPVUR+ZfkzkDcKnn+oPW1jKeXe4WlNWc7rIXOVxM=
github.com/mitchellh/copystructure v1.0.0 h1:Laisrj+bAB6b/yJwB5Bt3ITZhGJdqmxquMKeZ+mmkFQ=
github.com/mitchellh/copystructure v1.0.0/go.mod h1:SNtv71yrdKgLRyLFxmLdkAbkKEFWgYaq1OVrnRcwhnw=
github.com/mitchellh/copystructure v1.2.0 h1:vpKXTN4ewci03Vljg/q9QvCGUDttBOGBIa15WveJJGw=
github.com/mitchellh/copystructure v1.2.0/go.mod h1:qLl+cE2AmVv+CoeAwDPye/v+N2HKCj9FbZEVFJRxO9s=
github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/go-wordwrap v1.0.0/go.mod h1:ZXFpozHsX6DPmq2I0TCekCxypsnAUbP2oI0UX1GXzOo=
github.com/mitchellh/mapstructure v0.0.0-20160808181253-ca63d7c062ee/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/mitchellh/mapstructure v1.4.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/mitchellh/reflectwalk v1.0.0 h1:9D+8oIskB4VJBN5SFlmc27fSlIBZaov1Wpk/IfikLNY=
github.com/mitchellh/reflectwalk v1.0.0/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zxSIeXaQ=
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
Expand Down
13 changes: 9 additions & 4 deletions internal/core/command/controller/messaging/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,10 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt

externalResponseTopic := strings.Join([]string{externalMQTTInfo.Topics[common.ExternalCommandResponseTopicPrefixKey], deviceName, commandName, method}, "/")

internalMessageBusInfo := container.ConfigurationFrom(dic.Get).MessageBus
deviceServiceName, deviceRequestTopic, err := validateRequestTopic(internalMessageBusInfo.Topics[common.DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic)
internalBaseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix()
topicPrefix := common.BuildTopic(internalBaseTopic, common.CoreCommandDeviceRequestPublishTopic)

deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic)
if err != nil {
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, err.Error())
publishMessage(client, externalResponseTopic, qos, retain, responseEnvelope, lc)
Expand All @@ -135,12 +137,15 @@ func commandRequestHandler(requestTimeout time.Duration, dic *di.Container) mqtt
return
}

internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get)
deviceResponseTopicPrefix := common.BuildTopic(internalBaseTopic, common.ResponseTopic, deviceServiceName)

lc.Debugf("Sending Command request to internal MessageBus. Topic: %s, Request-id: %s Correlation-id: %s", deviceRequestTopic, requestEnvelope.RequestID, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)

internalMessageBus := bootstrapContainer.MessagingClientFrom(dic.Get)

// Request waits for the response and returns it.
response, err := internalMessageBus.Request(requestEnvelope, deviceServiceName, deviceRequestTopic, requestTimeout)
response, err := internalMessageBus.Request(requestEnvelope, deviceRequestTopic, deviceResponseTopicPrefix, requestTimeout)
if err != nil {
errorMessage := fmt.Sprintf("Failed to send DeviceCommand request with internal MessageBus: %v", err)
responseEnvelope := types.NewMessageEnvelopeWithError(requestEnvelope.RequestID, errorMessage)
Expand Down
11 changes: 4 additions & 7 deletions internal/core/command/controller/messaging/external_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"encoding/json"
"errors"
"net/http"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -55,7 +54,6 @@ const (
testExternalCommandRequestTopic = "unittest/external/request/#"
testExternalCommandRequestTopicExample = "unittest/external/request/testDevice/testCommand/get"
testExternalCommandResponseTopicPrefix = "unittest/external/response"
testInternalCommandRequestTopicPrefix = "unittest/internal/request"
)

func TestOnConnectHandler(t *testing.T) {
Expand Down Expand Up @@ -294,9 +292,7 @@ func Test_commandRequestHandler(t *testing.T) {
MaxResultCount: 20,
},
MessageBus: bootstrapConfig.MessageBusInfo{
Topics: map[string]string{
common.DeviceCommandRequestTopicPrefixKey: testInternalCommandRequestTopicPrefix,
},
BaseTopicPrefix: "edgex",
},
ExternalMQTT: bootstrapConfig.ExternalMQTTInfo{
QoS: 0,
Expand Down Expand Up @@ -371,8 +367,9 @@ func Test_commandRequestHandler(t *testing.T) {
return
}

expectedInternalRequestTopic := strings.Join([]string{testInternalCommandRequestTopicPrefix, testDeviceServiceName, testDeviceName, testCommandName, testMethod}, "/")
client.AssertCalled(t, "Request", tt.payload, testDeviceServiceName, expectedInternalRequestTopic, mock.Anything)
expectedInternalRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic, testDeviceServiceName, testDeviceName, testCommandName, testMethod)
expectedInternalResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, testDeviceServiceName)
client.AssertCalled(t, "Request", tt.payload, expectedInternalRequestTopic, expectedInternalResponseTopicPrefix, mock.Anything)
})
}
}
Expand Down
30 changes: 15 additions & 15 deletions internal/core/command/controller/messaging/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
// and forwards them to the appropriate Device Service via internal MessageBus
func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics
requestCommandTopic := messageBusTopics[common.CommandRequestTopicKey]
baseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix()
requestCommandTopic := common.BuildTopic(baseTopic, common.CoreCommandRequestSubscribeTopic)

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
Expand All @@ -56,7 +56,7 @@ func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration,
case err = <-messageErrors:
lc.Error(err.Error())
case requestEnvelope := <-messages:
processDeviceCommandRequest(messageBus, requestEnvelope, messageBusTopics, requestTimeout, lc, dic)
processDeviceCommandRequest(messageBus, requestEnvelope, baseTopic, requestTimeout, lc, dic)
}
}
}()
Expand All @@ -67,7 +67,7 @@ func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration,
func processDeviceCommandRequest(
messageBus messaging.MessageClient,
requestEnvelope types.MessageEnvelope,
messageBusTopics map[string]string,
baseTopic string,
requestTimeout time.Duration,
lc logger.LoggingClient,
dic *di.Container) {
Expand All @@ -82,8 +82,7 @@ func processDeviceCommandRequest(
}

// internal response topic scheme: <ResponseTopicPrefix>/<service-name>/<request-id>
internalResponseTopic := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/")

internalResponseTopic := common.BuildTopic(baseTopic, common.ResponseTopic, common.CoreCommandServiceKey, requestEnvelope.RequestID)
topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
length := len(topicLevels)
if length < 3 {
Expand Down Expand Up @@ -112,8 +111,9 @@ func processDeviceCommandRequest(
return
}

topicPrefix := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic)
// internal command request topic scheme: <DeviceRequestTopicPrefix>/<device-service>/<device>/<command-name>/<method>
deviceServiceName, deviceRequestTopic, err := validateRequestTopic(messageBusTopics[common.DeviceCommandRequestTopicPrefixKey], deviceName, commandName, method, dic)
deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic)
if err != nil {
err = fmt.Errorf("invalid request topic: %s", err.Error())
lc.Error(err.Error())
Expand All @@ -136,9 +136,10 @@ func processDeviceCommandRequest(
return
}

lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID)
deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, deviceServiceName)

deviceResponseTopicPrefix := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], deviceServiceName}, "/")
lc.Debugf("Sending Command Device Request to internal MessageBus. Topic: %s, Correlation-id: %s", deviceRequestTopic, requestEnvelope.CorrelationID)
lc.Debugf("Expecting response on topic: %s/%s", deviceResponseTopicPrefix, requestEnvelope.RequestID)

response, err := messageBus.Request(requestEnvelope, deviceRequestTopic, deviceResponseTopicPrefix, requestTimeout)
if err != nil {
Expand All @@ -160,8 +161,8 @@ func processDeviceCommandRequest(
// via internal MessageBus
func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX {
lc := bootstrapContainer.LoggingClientFrom(dic.Get)
messageBusTopics := container.ConfigurationFrom(dic.Get).MessageBus.Topics
queryRequestTopic := messageBusTopics[common.CommandQueryRequestTopicKey]
baseTopic := container.ConfigurationFrom(dic.Get).MessageBus.GetBaseTopicPrefix()
queryRequestTopic := common.BuildTopic(baseTopic, common.CoreCommandQueryRequestSubscribeTopic)

messages := make(chan types.MessageEnvelope)
messageErrors := make(chan error)
Expand Down Expand Up @@ -190,7 +191,7 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error
case err = <-messageErrors:
lc.Error(err.Error())
case requestEnvelope := <-messages:
processCommandQueryRequest(messageBus, requestEnvelope, messageBusTopics, lc, dic)
processCommandQueryRequest(messageBus, requestEnvelope, baseTopic, lc, dic)
}
}
}()
Expand All @@ -201,7 +202,7 @@ func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) error
func processCommandQueryRequest(
messageBus messaging.MessageClient,
requestEnvelope types.MessageEnvelope,
messageBusTopics map[string]string,
baseTopic string,
lc logger.LoggingClient,
dic *di.Container,
) {
Expand All @@ -228,8 +229,7 @@ func processCommandQueryRequest(
}

// internal response topic scheme: <ResponseTopicPrefix>/<service-name>/<request-id>
internalQueryResponseTopic := strings.Join([]string{messageBusTopics[common.ResponseTopicPrefixKey], common.CoreCommandServiceKey, requestEnvelope.RequestID}, "/")

internalQueryResponseTopic := common.BuildTopic(baseTopic, common.ResponseTopic, common.CoreCommandServiceKey, requestEnvelope.RequestID)
lc.Debugf("Responding to command query request on topic: %s", internalQueryResponseTopic)

err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic)
Expand Down
Loading