Skip to content

Commit

Permalink
Support distributed tracing (#24)
Browse files Browse the repository at this point in the history
* Pass trace ID from event to bbs client

Signed-off-by: Maria Shaldybin <mariash@vmware.com>

* Log trace ID when handling events

---------

Signed-off-by: Maria Shaldybin <mariash@vmware.com>
Co-authored-by: Nick Rohn <nrohn@vmware.com>
  • Loading branch information
mariash and notrepo05 authored May 23, 2023
1 parent 7076d7e commit 912ced0
Show file tree
Hide file tree
Showing 8 changed files with 140 additions and 119 deletions.
90 changes: 45 additions & 45 deletions cmd/route-emitter/main_test.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions routehandlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

"code.cloudfoundry.org/bbs/models"
"code.cloudfoundry.org/bbs/trace"
loggingclient "code.cloudfoundry.org/diego-logging-client"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/route-emitter/emitter"
Expand Down Expand Up @@ -54,21 +55,26 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) {

switch event := event.(type) {
case *models.DesiredLRPCreatedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
handler.handleDesiredCreate(logger, event.DesiredLrp)
case *models.DesiredLRPChangedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
err := handler.handleDesiredUpdate(logger, event.Before, event.After)
if err != nil {
logger.Error("failed-to-handle-desired-update", err)
}
case *models.DesiredLRPRemovedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
handler.handleDesiredDelete(logger, event.DesiredLrp)
case *models.ActualLRPInstanceCreatedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
if event.ActualLrp == nil {
logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()})
return
}
handler.handleActualCreate(logger, event.ActualLrp)
case *models.ActualLRPInstanceChangedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
before := event.Before.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
after := event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
logger.Debug("received-actual-lrp-changed-event", lager.Data{"before": before, "after": after})
Expand All @@ -81,6 +87,7 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) {
logger.Error("failed-to-handle-actual-update", err)
}
case *models.ActualLRPInstanceRemovedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
logger.Debug("received-actual-lrp-instance-removed-event", lager.Data{"lrp": event.ActualLrp})
if event.ActualLrp == nil {
logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()})
Expand Down
39 changes: 21 additions & 18 deletions routehandlers/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/onsi/gomega/gbytes"
)

const logGuid = "some-log-guid"
const (
logGuid = "some-log-guid"
traceId = "7f461654-74d1-1ee5-8367-77d85df2cdab"
)

type randomEvent struct {
proto.Message
Expand Down Expand Up @@ -154,7 +157,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, traceId))
})

It("should set the routes on the table", func() {
Expand Down Expand Up @@ -247,7 +250,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP, traceId))
})

It("should set the routes on the table", func() {
Expand Down Expand Up @@ -346,7 +349,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id"))
})

It("should remove the routes from the table", func() {
Expand Down Expand Up @@ -392,11 +395,11 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId))
})

It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand All @@ -420,7 +423,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id"))
})

It("should add/update the endpoints on the table", func() {
Expand Down Expand Up @@ -519,7 +522,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should add/update the endpoint on the table", func() {
Expand Down Expand Up @@ -556,7 +559,7 @@ var _ = Describe("Handler", func() {
beforeActualLRP = nil
})
It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand All @@ -566,7 +569,7 @@ var _ = Describe("Handler", func() {
afterActualLRP = nil
})
It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand Down Expand Up @@ -613,7 +616,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should remove the endpoint from the table", func() {
Expand Down Expand Up @@ -685,7 +688,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

Context("when the resulting LRP presence does not change", func() {
Expand Down Expand Up @@ -781,7 +784,7 @@ var _ = Describe("Handler", func() {
),
State: models.ActualLRPStateClaimed,
}
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should NOT log the net info", func() {
Expand Down Expand Up @@ -833,7 +836,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

It("should remove the endpoint from the table", func() {
Expand Down Expand Up @@ -865,7 +868,7 @@ var _ = Describe("Handler", func() {
State: models.ActualLRPStateCrashed,
}

routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

It("should NOT log the net info", func() {
Expand Down Expand Up @@ -896,7 +899,7 @@ var _ = Describe("Handler", func() {
})

It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand Down Expand Up @@ -1163,7 +1166,7 @@ var _ = Describe("Handler", func() {
ProcessGuid: "pg-4",
Routes: &routes,
Instances: 1,
})
}, "some-trace-id")

endpoint4 = routingtable.Endpoint{
InstanceGUID: "ig-4",
Expand All @@ -1180,7 +1183,7 @@ var _ = Describe("Handler", func() {
ActualLRPInstanceKey: models.NewActualLRPInstanceKey(endpoint4.InstanceGUID, "cell-id"),
ActualLRPNetInfo: models.NewActualLRPNetInfo(endpoint4.Host, "container-ip-4", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(endpoint4.Port, endpoint4.ContainerPort)),
State: models.ActualLRPStateRunning,
})
}, "some-trace-id")

cachedEvents := map[string]models.Event{
desiredLRPEvent.Key(): desiredLRPEvent,
Expand Down
16 changes: 8 additions & 8 deletions routehandlers/routing_api_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleDesiredCreate", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, "some-trace-id"))
})

It("invokes AddRoutes on RoutingTable", func() {
Expand Down Expand Up @@ -111,7 +111,7 @@ var _ = Describe("RoutingAPIHandler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after))
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after, "some-trace-id"))
})

It("invokes UpdateRoutes on RoutingTable", func() {
Expand Down Expand Up @@ -142,7 +142,7 @@ var _ = Describe("RoutingAPIHandler", func() {
fakeRoutingTable.RemoveRoutesReturns(unregistrationEvent, emptyNatsMessages)
})
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id"))
})

It("does not invoke AddRoutes on RoutingTable", func() {
Expand All @@ -168,7 +168,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleActualCreate", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id"))
})

Context("when state is Running", func() {
Expand Down Expand Up @@ -236,7 +236,7 @@ var _ = Describe("RoutingAPIHandler", func() {
)

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP, "some-trace-id"))
})

Context("when after state is Running", func() {
Expand Down Expand Up @@ -366,7 +366,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleActualDelete", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

Context("when state is Running", func() {
Expand Down Expand Up @@ -615,7 +615,7 @@ var _ = Describe("RoutingAPIHandler", func() {
ProcessGuid: "process-guid-2",
Routes: tcpRoutes.RoutingInfo(),
Instances: 1,
})
}, "some-trace-id")

actualLRPEvent := models.NewActualLRPInstanceCreatedEvent(&models.ActualLRP{
ActualLRPKey: models.NewActualLRPKey("process-guid-2", 0, "domain"),
Expand All @@ -628,7 +628,7 @@ var _ = Describe("RoutingAPIHandler", func() {
),
State: models.ActualLRPStateRunning,
ModificationTag: modificationTag,
})
}, "some-trace-id")

cachedEvents := map[string]models.Event{
desiredLRPEvent.Key(): desiredLRPEvent,
Expand Down
4 changes: 2 additions & 2 deletions routingtable/nats_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the added LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down Expand Up @@ -247,7 +247,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the removed LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down
13 changes: 8 additions & 5 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,14 @@ func (watcher *Watcher) Run(signals <-chan os.Signal, ready chan<- struct{}) err
func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Event, currentDesireds []*models.DesiredLRP, syncing bool) []*models.DesiredLRP {
var err error
var actualLRP *models.ActualLRP
var traceId string
switch event := event.(type) {
case *models.ActualLRPInstanceCreatedEvent:
actualLRP = event.ActualLrp
traceId = event.TraceId
case *models.ActualLRPInstanceChangedEvent:
actualLRP = event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
traceId = event.TraceId
default:
return nil
}
Expand All @@ -205,7 +208,7 @@ func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Even
}
if w.routeHandler.ShouldRefreshDesired(actualLRP) || (syncing && !foundInCurrentDesireds(actualLRP.ProcessGuid, currentDesireds)) {
logger.Info("refreshing-desired-lrp-info", lager.Data{"process-guid": actualLRP.ProcessGuid})
desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, models.DesiredLRPFilter{
desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, traceId, models.DesiredLRPFilter{
ProcessGuids: []string{actualLRP.ProcessGuid},
})
if err != nil {
Expand Down Expand Up @@ -257,7 +260,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) {
defer wg.Done()
logger.Debug("getting-actual-lrps")
var actualLRPs []*models.ActualLRP
actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, models.ActualLRPFilter{CellID: w.cellID})
actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, "", models.ActualLRPFilter{CellID: w.cellID})
if actualErr != nil {
logger.Error("failed-getting-actual-lrps", actualErr)
return
Expand Down Expand Up @@ -295,7 +298,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) {
defer wg.Done()
var domainArray []string
logger.Debug("getting-domains")
domainArray, domainsErr = w.bbsClient.Domains(logger)
domainArray, domainsErr = w.bbsClient.Domains(logger, "")
if domainsErr != nil {
logger.Error("failed-getting-domains", domainsErr)
return
Expand Down Expand Up @@ -358,9 +361,9 @@ func getDesiredLRPs(logger lager.Logger, bbsClient bbs.Client, guids []string) (
logger.Debug("getting-desired-lrps-routing-info", lager.Data{"guids-length": len(guids)})
filter := models.DesiredLRPFilter{ProcessGuids: guids}

desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, filter)
desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, "", filter)
if err == bbs.EndpointNotFoundErr {
desiredLRPs, err = bbsClient.DesiredLRPs(logger, filter)
desiredLRPs, err = bbsClient.DesiredLRPs(logger, "", filter)
}

if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions watcher/watcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var _ = Describe("Watcher Integration", func() {
)

sendEvent := func() {
Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP)}))
Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP, "some-trace-id")}))
Eventually(logger).Should(gbytes.Say("caching-event"))
}

Expand Down Expand Up @@ -167,7 +167,7 @@ var _ = Describe("Watcher Integration", func() {
}
}

bbsClient.ActualLRPsStub = func(logger lager.Logger, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) {
bbsClient.ActualLRPsStub = func(logger lager.Logger, traceId string, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) {
defer GinkgoRecover()

sendEvent()
Expand All @@ -177,7 +177,7 @@ var _ = Describe("Watcher Integration", func() {
}, nil
}

bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) {
bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, traceId string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) {
defer GinkgoRecover()
return []*models.DesiredLRP{desiredLRP1}, nil
}
Expand Down
Loading

0 comments on commit 912ced0

Please sign in to comment.