Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support distributed tracing #24

Merged
merged 2 commits into from
May 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 45 additions & 45 deletions cmd/route-emitter/main_test.go

Large diffs are not rendered by default.

7 changes: 7 additions & 0 deletions routehandlers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"

"code.cloudfoundry.org/bbs/models"
"code.cloudfoundry.org/bbs/trace"
loggingclient "code.cloudfoundry.org/diego-logging-client"
"code.cloudfoundry.org/lager/v3"
"code.cloudfoundry.org/route-emitter/emitter"
Expand Down Expand Up @@ -54,21 +55,26 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) {

switch event := event.(type) {
case *models.DesiredLRPCreatedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
handler.handleDesiredCreate(logger, event.DesiredLrp)
case *models.DesiredLRPChangedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
err := handler.handleDesiredUpdate(logger, event.Before, event.After)
if err != nil {
logger.Error("failed-to-handle-desired-update", err)
}
case *models.DesiredLRPRemovedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
handler.handleDesiredDelete(logger, event.DesiredLrp)
case *models.ActualLRPInstanceCreatedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
if event.ActualLrp == nil {
logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()})
return
}
handler.handleActualCreate(logger, event.ActualLrp)
case *models.ActualLRPInstanceChangedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
before := event.Before.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
after := event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
logger.Debug("received-actual-lrp-changed-event", lager.Data{"before": before, "after": after})
Expand All @@ -81,6 +87,7 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) {
logger.Error("failed-to-handle-actual-update", err)
}
case *models.ActualLRPInstanceRemovedEvent:
logger = trace.LoggerWithTraceInfo(logger, event.TraceId)
logger.Debug("received-actual-lrp-instance-removed-event", lager.Data{"lrp": event.ActualLrp})
if event.ActualLrp == nil {
logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()})
Expand Down
39 changes: 21 additions & 18 deletions routehandlers/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ import (
"github.com/onsi/gomega/gbytes"
)

const logGuid = "some-log-guid"
const (
logGuid = "some-log-guid"
traceId = "7f461654-74d1-1ee5-8367-77d85df2cdab"
)

type randomEvent struct {
proto.Message
Expand Down Expand Up @@ -154,7 +157,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, traceId))
})

It("should set the routes on the table", func() {
Expand Down Expand Up @@ -247,7 +250,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP, traceId))
})

It("should set the routes on the table", func() {
Expand Down Expand Up @@ -346,7 +349,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id"))
})

It("should remove the routes from the table", func() {
Expand Down Expand Up @@ -392,11 +395,11 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId))
})

It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand All @@ -420,7 +423,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id"))
})

It("should add/update the endpoints on the table", func() {
Expand Down Expand Up @@ -519,7 +522,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should add/update the endpoint on the table", func() {
Expand Down Expand Up @@ -556,7 +559,7 @@ var _ = Describe("Handler", func() {
beforeActualLRP = nil
})
It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand All @@ -566,7 +569,7 @@ var _ = Describe("Handler", func() {
afterActualLRP = nil
})
It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand Down Expand Up @@ -613,7 +616,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should remove the endpoint from the table", func() {
Expand Down Expand Up @@ -685,7 +688,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

Context("when the resulting LRP presence does not change", func() {
Expand Down Expand Up @@ -781,7 +784,7 @@ var _ = Describe("Handler", func() {
),
State: models.ActualLRPStateClaimed,
}
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id"))
})

It("should NOT log the net info", func() {
Expand Down Expand Up @@ -833,7 +836,7 @@ var _ = Describe("Handler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

It("should remove the endpoint from the table", func() {
Expand Down Expand Up @@ -865,7 +868,7 @@ var _ = Describe("Handler", func() {
State: models.ActualLRPStateCrashed,
}

routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

It("should NOT log the net info", func() {
Expand Down Expand Up @@ -896,7 +899,7 @@ var _ = Describe("Handler", func() {
})

It("logs an error", func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
Expect(logger).To(gbytes.Say("nil-actual-lrp"))
})
})
Expand Down Expand Up @@ -1163,7 +1166,7 @@ var _ = Describe("Handler", func() {
ProcessGuid: "pg-4",
Routes: &routes,
Instances: 1,
})
}, "some-trace-id")

endpoint4 = routingtable.Endpoint{
InstanceGUID: "ig-4",
Expand All @@ -1180,7 +1183,7 @@ var _ = Describe("Handler", func() {
ActualLRPInstanceKey: models.NewActualLRPInstanceKey(endpoint4.InstanceGUID, "cell-id"),
ActualLRPNetInfo: models.NewActualLRPNetInfo(endpoint4.Host, "container-ip-4", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(endpoint4.Port, endpoint4.ContainerPort)),
State: models.ActualLRPStateRunning,
})
}, "some-trace-id")

cachedEvents := map[string]models.Event{
desiredLRPEvent.Key(): desiredLRPEvent,
Expand Down
16 changes: 8 additions & 8 deletions routehandlers/routing_api_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleDesiredCreate", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, "some-trace-id"))
})

It("invokes AddRoutes on RoutingTable", func() {
Expand Down Expand Up @@ -111,7 +111,7 @@ var _ = Describe("RoutingAPIHandler", func() {
})

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after))
routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after, "some-trace-id"))
})

It("invokes UpdateRoutes on RoutingTable", func() {
Expand Down Expand Up @@ -142,7 +142,7 @@ var _ = Describe("RoutingAPIHandler", func() {
fakeRoutingTable.RemoveRoutesReturns(unregistrationEvent, emptyNatsMessages)
})
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP))
routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id"))
})

It("does not invoke AddRoutes on RoutingTable", func() {
Expand All @@ -168,7 +168,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleActualCreate", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id"))
})

Context("when state is Running", func() {
Expand Down Expand Up @@ -236,7 +236,7 @@ var _ = Describe("RoutingAPIHandler", func() {
)

JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP, "some-trace-id"))
})

Context("when after state is Running", func() {
Expand Down Expand Up @@ -366,7 +366,7 @@ var _ = Describe("RoutingAPIHandler", func() {

Describe("HandleActualDelete", func() {
JustBeforeEach(func() {
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP))
routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id"))
})

Context("when state is Running", func() {
Expand Down Expand Up @@ -615,7 +615,7 @@ var _ = Describe("RoutingAPIHandler", func() {
ProcessGuid: "process-guid-2",
Routes: tcpRoutes.RoutingInfo(),
Instances: 1,
})
}, "some-trace-id")

actualLRPEvent := models.NewActualLRPInstanceCreatedEvent(&models.ActualLRP{
ActualLRPKey: models.NewActualLRPKey("process-guid-2", 0, "domain"),
Expand All @@ -628,7 +628,7 @@ var _ = Describe("RoutingAPIHandler", func() {
),
State: models.ActualLRPStateRunning,
ModificationTag: modificationTag,
})
}, "some-trace-id")

cachedEvents := map[string]models.Event{
desiredLRPEvent.Key(): desiredLRPEvent,
Expand Down
4 changes: 2 additions & 2 deletions routingtable/nats_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the added LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down Expand Up @@ -247,7 +247,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the removed LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down
13 changes: 8 additions & 5 deletions watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,14 @@ func (watcher *Watcher) Run(signals <-chan os.Signal, ready chan<- struct{}) err
func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Event, currentDesireds []*models.DesiredLRP, syncing bool) []*models.DesiredLRP {
var err error
var actualLRP *models.ActualLRP
var traceId string
switch event := event.(type) {
case *models.ActualLRPInstanceCreatedEvent:
actualLRP = event.ActualLrp
traceId = event.TraceId
case *models.ActualLRPInstanceChangedEvent:
actualLRP = event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey)
traceId = event.TraceId
default:
return nil
}
Expand All @@ -205,7 +208,7 @@ func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Even
}
if w.routeHandler.ShouldRefreshDesired(actualLRP) || (syncing && !foundInCurrentDesireds(actualLRP.ProcessGuid, currentDesireds)) {
logger.Info("refreshing-desired-lrp-info", lager.Data{"process-guid": actualLRP.ProcessGuid})
desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, models.DesiredLRPFilter{
desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, traceId, models.DesiredLRPFilter{
ProcessGuids: []string{actualLRP.ProcessGuid},
})
if err != nil {
Expand Down Expand Up @@ -257,7 +260,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) {
defer wg.Done()
logger.Debug("getting-actual-lrps")
var actualLRPs []*models.ActualLRP
actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, models.ActualLRPFilter{CellID: w.cellID})
actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, "", models.ActualLRPFilter{CellID: w.cellID})
if actualErr != nil {
logger.Error("failed-getting-actual-lrps", actualErr)
return
Expand Down Expand Up @@ -295,7 +298,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) {
defer wg.Done()
var domainArray []string
logger.Debug("getting-domains")
domainArray, domainsErr = w.bbsClient.Domains(logger)
domainArray, domainsErr = w.bbsClient.Domains(logger, "")
if domainsErr != nil {
logger.Error("failed-getting-domains", domainsErr)
return
Expand Down Expand Up @@ -358,9 +361,9 @@ func getDesiredLRPs(logger lager.Logger, bbsClient bbs.Client, guids []string) (
logger.Debug("getting-desired-lrps-routing-info", lager.Data{"guids-length": len(guids)})
filter := models.DesiredLRPFilter{ProcessGuids: guids}

desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, filter)
desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, "", filter)
if err == bbs.EndpointNotFoundErr {
desiredLRPs, err = bbsClient.DesiredLRPs(logger, filter)
desiredLRPs, err = bbsClient.DesiredLRPs(logger, "", filter)
}

if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions watcher/watcher_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var _ = Describe("Watcher Integration", func() {
)

sendEvent := func() {
Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP)}))
Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP, "some-trace-id")}))
Eventually(logger).Should(gbytes.Say("caching-event"))
}

Expand Down Expand Up @@ -167,7 +167,7 @@ var _ = Describe("Watcher Integration", func() {
}
}

bbsClient.ActualLRPsStub = func(logger lager.Logger, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) {
bbsClient.ActualLRPsStub = func(logger lager.Logger, traceId string, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) {
defer GinkgoRecover()

sendEvent()
Expand All @@ -177,7 +177,7 @@ var _ = Describe("Watcher Integration", func() {
}, nil
}

bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) {
bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, traceId string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) {
defer GinkgoRecover()
return []*models.DesiredLRP{desiredLRP1}, nil
}
Expand Down
Loading