diff --git a/cmd/route-emitter/main_test.go b/cmd/route-emitter/main_test.go index 7757af4..d3f91cb 100644 --- a/cmd/route-emitter/main_test.go +++ b/cmd/route-emitter/main_test.go @@ -565,11 +565,11 @@ var _ = Describe("Route Emitter", func() { Context("when an lrp is desired", func() { BeforeEach(func() { desiredLRP := getDesiredLRP("some-guid", routerGUID, 5222, 5222) - Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred()) + Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred()) lrpKey := models.NewActualLRPKey("some-guid", 0, domain) instanceKey := models.NewActualLRPInstanceKey("instance-guid", "cell-id") netInfo := models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(62003, 5222)) - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) }) It("requests a token from the server", func() { @@ -688,7 +688,7 @@ var _ = Describe("Route Emitter", func() { }) JustBeforeEach(func() { - Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed()) + Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed()) Eventually(blkChannel).Should(BeSent(struct{}{})) Eventually(runner).Should(gbytes.Say("sync.complete")) }) @@ -696,7 +696,7 @@ var _ = Describe("Route Emitter", func() { Context("then a desired lrp event is received", func() { JustBeforeEach(func() { Eventually(runner).Should(gbytes.Say("succeeded-getting-actual-lrps")) - Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred()) + Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred()) Eventually(runner).Should(gbytes.Say("caching-event")) Eventually(blkChannel).Should(BeSent(struct{}{})) }) @@ -708,7 +708,7 @@ var _ = Describe("Route Emitter", func() { lrpKey = models.NewActualLRPKey(processGUID, 0, domain) instanceKey = models.NewActualLRPInstanceKey("instance-guid", "cell-id") netInfo = models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(5222, 5222)) - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) Eventually(runner).Should(gbytes.Say("caching-event")) By("unblocking the sync loop") @@ -753,7 +753,7 @@ var _ = Describe("Route Emitter", func() { }) JustBeforeEach(func() { - Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred()) + Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred()) }) Context("and an instance is started", func() { @@ -764,7 +764,7 @@ var _ = Describe("Route Emitter", func() { }) JustBeforeEach(func() { - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) }) It("emits its routes immediately", func() { @@ -858,7 +858,7 @@ var _ = Describe("Route Emitter", func() { update := &models.DesiredLRPUpdate{ Routes: routes, } - err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update) + err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update) Expect(err).NotTo(HaveOccurred()) }) @@ -878,7 +878,7 @@ var _ = Describe("Route Emitter", func() { update := &models.DesiredLRPUpdate{ Routes: &models.Routes{}, } - err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update) + err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update) Expect(err).NotTo(HaveOccurred()) }) @@ -906,7 +906,7 @@ var _ = Describe("Route Emitter", func() { ProcessGuid: expectedTCPProcessGUID, Index: index, } - err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey) + err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey) Expect(err).NotTo(HaveOccurred()) }) @@ -945,7 +945,7 @@ var _ = Describe("Route Emitter", func() { }) JustBeforeEach(func() { - Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed()) + Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed()) }) It("should emit a route registration", func() { @@ -953,7 +953,7 @@ var _ = Describe("Route Emitter", func() { lrpKey = models.NewActualLRPKey(expectedTCPProcessGUID, 0, domain) instanceKey = models.NewActualLRPInstanceKey("instance-guid", "cell-id") netInfo = models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(5222, 5222)) - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) Eventually(runner).Should(gbytes.Say("caching-event")) By("unblocking the sync loop") @@ -984,12 +984,12 @@ var _ = Describe("Route Emitter", func() { JustBeforeEach(func() { ginkgomon.Kill(routingApiProcess, routingAPIInterruptTimeout) desiredLRP := getDesiredLRP("some-guid-1", "some-guid", 1883, 1883) - Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred()) + Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred()) key := models.NewActualLRPKey("some-guid-1", 0, domain) instanceKey := models.NewActualLRPInstanceKey("instance-guid-1", "cell-id") netInfo := models.NewActualLRPNetInfo("some-ip-1", "container-ip-1", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(62003, 1883)) - Expect(bbsClient.StartActualLRP(logger, &key, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &key, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) }) It("starts an SSE connection to the bbs and continues to try to emit to routing api", func() { @@ -1241,9 +1241,9 @@ var _ = Describe("Route Emitter", func() { }) It("emits routes", func() { - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) - err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) var msg1, msg2 routingtable.RegistryMessage Eventually(registeredRoutes).Should(Receive(&msg1)) @@ -1325,13 +1325,13 @@ var _ = Describe("Route Emitter", func() { Context("and an lrp with routes is desired", func() { BeforeEach(func() { - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) }) Context("and an instance starts", func() { JustBeforeEach(func() { - err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1372,7 +1372,7 @@ var _ = Describe("Route Emitter", func() { sqlRunner.Reset() // Only start actual LRP, do not repopulate Desired - err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1615,7 +1615,7 @@ var _ = Describe("Route Emitter", func() { ProcessGuid: processGuid, Index: index, } - err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey) + err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey) Expect(err).NotTo(HaveOccurred()) }) @@ -1628,10 +1628,10 @@ var _ = Describe("Route Emitter", func() { Context("an actual lrp starts without a routed desired lrp", func() { BeforeEach(func() { desiredLRP.Routes = nil - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) - err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1640,7 +1640,7 @@ var _ = Describe("Route Emitter", func() { update := &models.DesiredLRPUpdate{ Routes: routes, } - err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update) + err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update) Expect(err).NotTo(HaveOccurred()) }) @@ -1789,10 +1789,10 @@ var _ = Describe("Route Emitter", func() { Context("when an lrp with internal routes is desired and an instance starts", func() { BeforeEach(func() { desiredLRP.Routes = newInternalRoutes([]string{"foo1.bar", "foo2.bar"}) - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) - err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) It("does not emit any internal routes", func() { @@ -1834,13 +1834,13 @@ var _ = Describe("Route Emitter", func() { Context("and an lrp with routes is desired", func() { BeforeEach(func() { - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) }) Context("and an instance starts", func() { JustBeforeEach(func() { - err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1878,7 +1878,7 @@ var _ = Describe("Route Emitter", func() { Eventually(runner).Should(gbytes.Say("succeeded-getting-desired-lrps")) // Only start actual LRP, do not repopulate Desired - err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1949,7 +1949,7 @@ var _ = Describe("Route Emitter", func() { ProcessGuid: processGuid, Index: index, } - err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey) + err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey) Expect(err).NotTo(HaveOccurred()) }) @@ -1962,10 +1962,10 @@ var _ = Describe("Route Emitter", func() { Context("an actual lrp starts without a routed desired lrp", func() { BeforeEach(func() { desiredLRP.Routes = nil - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) - err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -1974,7 +1974,7 @@ var _ = Describe("Route Emitter", func() { update := &models.DesiredLRPUpdate{ Routes: routes, } - err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update) + err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update) Expect(err).NotTo(HaveOccurred()) }) @@ -2065,10 +2065,10 @@ var _ = Describe("Route Emitter", func() { var emitter ifrit.Process BeforeEach(func() { - err := bbsClient.DesireLRP(logger, desiredLRP) + err := bbsClient.DesireLRP(logger, "", desiredLRP) Expect(err).NotTo(HaveOccurred()) - err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) + err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{}) Expect(err).NotTo(HaveOccurred()) }) @@ -2131,7 +2131,7 @@ var _ = Describe("Route Emitter", func() { updateRequest.SetInstances(desiredLRP.Instances) updateRequest.SetAnnotation(desiredLRP.Annotation) - err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest) + err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest) Expect(err).NotTo(HaveOccurred()) }) @@ -2205,12 +2205,12 @@ var _ = Describe("Route Emitter", func() { } updateRequest.SetInstances(desiredLRP.Instances) updateRequest.SetAnnotation(desiredLRP.Annotation) - err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest) + err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest) Expect(err).NotTo(HaveOccurred()) }) It("immediately emits router.unregister when domain is fresh", func() { - bbsClient.UpsertDomain(logger, domain, 2*time.Second) + bbsClient.UpsertDomain(logger, "", domain, 2*time.Second) Eventually(unregisteredRoutes, msgReceiveTimeout).Should(Receive( MatchRegistryMessage(expectedUnregistrationForRoute1), )) @@ -2220,7 +2220,7 @@ var _ = Describe("Route Emitter", func() { }) It("repeatedly sends unregistration messages specified in UnregistrationSendCount number of times", func() { - bbsClient.UpsertDomain(logger, domain, 2*time.Second) + bbsClient.UpsertDomain(logger, "", domain, 2*time.Second) for i := 0; i < unregistrationSendCount+1; i++ { Eventually(unregisteredRoutes, msgReceiveTimeout).Should(Receive( MatchRegistryMessage(expectedUnregistrationForRoute1), @@ -2266,7 +2266,7 @@ var _ = Describe("Route Emitter", func() { } updateRequest.SetInstances(desiredLRP.Instances) updateRequest.SetAnnotation(desiredLRP.Annotation) - err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest) + err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest) Expect(err).NotTo(HaveOccurred()) newDesiredLRP = &models.DesiredLRP{} @@ -2277,7 +2277,7 @@ var _ = Describe("Route Emitter", func() { }) It("sends unregistration messages unless there is re-registration", func() { - bbsClient.UpsertDomain(logger, domain, 2*time.Second) + bbsClient.UpsertDomain(logger, "", domain, 2*time.Second) var receivedMessage routingtable.RegistryMessage unregisteredRouteMessage := func() routingtable.RegistryMessage { @@ -2289,12 +2289,12 @@ var _ = Describe("Route Emitter", func() { MatchRegistryMessage(expectedUnregistrationForRoute1), fmt.Sprintf("Failed to receive expected message, received: %#v, expected: %#v", receivedMessage, expectedUnregistrationForRoute1)) // this will re-register route-1 and route-2 - err := bbsClient.DesireLRP(logger, newDesiredLRP) + err := bbsClient.DesireLRP(logger, "", newDesiredLRP) Expect(err).NotTo(HaveOccurred()) lrpKey := models.NewActualLRPKey("some-other-guid", 0, domain) instanceKey := models.NewActualLRPInstanceKey("instance-guid", "cell-id") netInfo := models.NewActualLRPNetInfo("1.2.3.4", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(65100, 8080)) - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) // keep reading unregistration messages until route-1 is re-registered done := make(chan struct{}) @@ -2359,8 +2359,8 @@ var _ = Describe("Route Emitter", func() { JustBeforeEach(func() { runner = createEmitterRunner("route-emitter", "cell-id", cfgs...) - Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed()) - Expect(bbsClient.DesireLRP(logger, desiredLRP)).To(Succeed()) + Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed()) + Expect(bbsClient.DesireLRP(logger, "", desiredLRP)).To(Succeed()) }) It("should refresh the desired lrp and emit a route registration", func() { @@ -2368,7 +2368,7 @@ var _ = Describe("Route Emitter", func() { runner.StartCheck = "succeeded-getting-actual-lrps" emitter = ginkgomon.Invoke(runner) - Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) + Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed()) Eventually(runner).Should(gbytes.Say("caching-event")) By("unblocking the sync loop") diff --git a/routehandlers/handler.go b/routehandlers/handler.go index 3a4141a..3fecd97 100644 --- a/routehandlers/handler.go +++ b/routehandlers/handler.go @@ -4,6 +4,7 @@ import ( "errors" "code.cloudfoundry.org/bbs/models" + "code.cloudfoundry.org/bbs/trace" loggingclient "code.cloudfoundry.org/diego-logging-client" "code.cloudfoundry.org/lager/v3" "code.cloudfoundry.org/route-emitter/emitter" @@ -54,21 +55,26 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) { switch event := event.(type) { case *models.DesiredLRPCreatedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) handler.handleDesiredCreate(logger, event.DesiredLrp) case *models.DesiredLRPChangedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) err := handler.handleDesiredUpdate(logger, event.Before, event.After) if err != nil { logger.Error("failed-to-handle-desired-update", err) } case *models.DesiredLRPRemovedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) handler.handleDesiredDelete(logger, event.DesiredLrp) case *models.ActualLRPInstanceCreatedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) if event.ActualLrp == nil { logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()}) return } handler.handleActualCreate(logger, event.ActualLrp) case *models.ActualLRPInstanceChangedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) before := event.Before.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey) after := event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey) logger.Debug("received-actual-lrp-changed-event", lager.Data{"before": before, "after": after}) @@ -81,6 +87,7 @@ func (handler *Handler) HandleEvent(logger lager.Logger, event models.Event) { logger.Error("failed-to-handle-actual-update", err) } case *models.ActualLRPInstanceRemovedEvent: + logger = trace.LoggerWithTraceInfo(logger, event.TraceId) logger.Debug("received-actual-lrp-instance-removed-event", lager.Data{"lrp": event.ActualLrp}) if event.ActualLrp == nil { logger.Error("nil-actual-lrp", nil, lager.Data{"event-type": event.EventType()}) diff --git a/routehandlers/handler_test.go b/routehandlers/handler_test.go index f55f95e..8427233 100644 --- a/routehandlers/handler_test.go +++ b/routehandlers/handler_test.go @@ -24,7 +24,10 @@ import ( "github.com/onsi/gomega/gbytes" ) -const logGuid = "some-log-guid" +const ( + logGuid = "some-log-guid" + traceId = "7f461654-74d1-1ee5-8367-77d85df2cdab" +) type randomEvent struct { proto.Message @@ -154,7 +157,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, traceId)) }) It("should set the routes on the table", func() { @@ -247,7 +250,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(originalDesiredLRP, changedDesiredLRP, traceId)) }) It("should set the routes on the table", func() { @@ -346,7 +349,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id")) }) It("should remove the routes from the table", func() { @@ -392,11 +395,11 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId)) }) It("logs an error", func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, traceId)) Expect(logger).To(gbytes.Say("nil-actual-lrp")) }) }) @@ -420,7 +423,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id")) }) It("should add/update the endpoints on the table", func() { @@ -519,7 +522,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) }) It("should add/update the endpoint on the table", func() { @@ -556,7 +559,7 @@ var _ = Describe("Handler", func() { beforeActualLRP = nil }) It("logs an error", func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) Expect(logger).To(gbytes.Say("nil-actual-lrp")) }) }) @@ -566,7 +569,7 @@ var _ = Describe("Handler", func() { afterActualLRP = nil }) It("logs an error", func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) Expect(logger).To(gbytes.Say("nil-actual-lrp")) }) }) @@ -613,7 +616,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) }) It("should remove the endpoint from the table", func() { @@ -685,7 +688,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) }) Context("when the resulting LRP presence does not change", func() { @@ -781,7 +784,7 @@ var _ = Describe("Handler", func() { ), State: models.ActualLRPStateClaimed, } - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(beforeActualLRP, afterActualLRP, "some-trace-id")) }) It("should NOT log the net info", func() { @@ -833,7 +836,7 @@ var _ = Describe("Handler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id")) }) It("should remove the endpoint from the table", func() { @@ -865,7 +868,7 @@ var _ = Describe("Handler", func() { State: models.ActualLRPStateCrashed, } - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id")) }) It("should NOT log the net info", func() { @@ -896,7 +899,7 @@ var _ = Describe("Handler", func() { }) It("logs an error", func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id")) Expect(logger).To(gbytes.Say("nil-actual-lrp")) }) }) @@ -1163,7 +1166,7 @@ var _ = Describe("Handler", func() { ProcessGuid: "pg-4", Routes: &routes, Instances: 1, - }) + }, "some-trace-id") endpoint4 = routingtable.Endpoint{ InstanceGUID: "ig-4", @@ -1180,7 +1183,7 @@ var _ = Describe("Handler", func() { ActualLRPInstanceKey: models.NewActualLRPInstanceKey(endpoint4.InstanceGUID, "cell-id"), ActualLRPNetInfo: models.NewActualLRPNetInfo(endpoint4.Host, "container-ip-4", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(endpoint4.Port, endpoint4.ContainerPort)), State: models.ActualLRPStateRunning, - }) + }, "some-trace-id") cachedEvents := map[string]models.Event{ desiredLRPEvent.Key(): desiredLRPEvent, diff --git a/routehandlers/routing_api_handler_test.go b/routehandlers/routing_api_handler_test.go index 9f749a3..2106295 100644 --- a/routehandlers/routing_api_handler_test.go +++ b/routehandlers/routing_api_handler_test.go @@ -67,7 +67,7 @@ var _ = Describe("RoutingAPIHandler", func() { Describe("HandleDesiredCreate", func() { JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPCreatedEvent(desiredLRP, "some-trace-id")) }) It("invokes AddRoutes on RoutingTable", func() { @@ -111,7 +111,7 @@ var _ = Describe("RoutingAPIHandler", func() { }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPChangedEvent(desiredLRP, after, "some-trace-id")) }) It("invokes UpdateRoutes on RoutingTable", func() { @@ -142,7 +142,7 @@ var _ = Describe("RoutingAPIHandler", func() { fakeRoutingTable.RemoveRoutesReturns(unregistrationEvent, emptyNatsMessages) }) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP)) + routeHandler.HandleEvent(logger, models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id")) }) It("does not invoke AddRoutes on RoutingTable", func() { @@ -168,7 +168,7 @@ var _ = Describe("RoutingAPIHandler", func() { Describe("HandleActualCreate", func() { JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id")) }) Context("when state is Running", func() { @@ -236,7 +236,7 @@ var _ = Describe("RoutingAPIHandler", func() { ) JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceChangedEvent(actualLRP, afterLRP, "some-trace-id")) }) Context("when after state is Running", func() { @@ -366,7 +366,7 @@ var _ = Describe("RoutingAPIHandler", func() { Describe("HandleActualDelete", func() { JustBeforeEach(func() { - routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP)) + routeHandler.HandleEvent(logger, models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id")) }) Context("when state is Running", func() { @@ -615,7 +615,7 @@ var _ = Describe("RoutingAPIHandler", func() { ProcessGuid: "process-guid-2", Routes: tcpRoutes.RoutingInfo(), Instances: 1, - }) + }, "some-trace-id") actualLRPEvent := models.NewActualLRPInstanceCreatedEvent(&models.ActualLRP{ ActualLRPKey: models.NewActualLRPKey("process-guid-2", 0, "domain"), @@ -628,7 +628,7 @@ var _ = Describe("RoutingAPIHandler", func() { ), State: models.ActualLRPStateRunning, ModificationTag: modificationTag, - }) + }, "some-trace-id") cachedEvents := map[string]models.Event{ desiredLRPEvent.Key(): desiredLRPEvent, diff --git a/routingtable/nats_routing_table_test.go b/routingtable/nats_routing_table_test.go index 9de3cc9..12c08a4 100644 --- a/routingtable/nats_routing_table_test.go +++ b/routingtable/nats_routing_table_test.go @@ -211,7 +211,7 @@ var _ = Describe("RoutingTable", func() { It("should log the added LRP net info", func() { Expect(logger).To(Say( - `"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`, + `"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`, endpoint1.Host, endpoint1.ContainerPort, endpoint1.Port, @@ -247,7 +247,7 @@ var _ = Describe("RoutingTable", func() { It("should log the removed LRP net info", func() { Expect(logger).To(Say( - `"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`, + `"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`, endpoint1.Host, endpoint1.ContainerPort, endpoint1.Port, diff --git a/watcher/watcher.go b/watcher/watcher.go index d55b5f7..57dbff3 100644 --- a/watcher/watcher.go +++ b/watcher/watcher.go @@ -187,11 +187,14 @@ func (watcher *Watcher) Run(signals <-chan os.Signal, ready chan<- struct{}) err func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Event, currentDesireds []*models.DesiredLRP, syncing bool) []*models.DesiredLRP { var err error var actualLRP *models.ActualLRP + var traceId string switch event := event.(type) { case *models.ActualLRPInstanceCreatedEvent: actualLRP = event.ActualLrp + traceId = event.TraceId case *models.ActualLRPInstanceChangedEvent: actualLRP = event.After.ToActualLRP(event.ActualLRPKey, event.ActualLRPInstanceKey) + traceId = event.TraceId default: return nil } @@ -205,7 +208,7 @@ func (w *Watcher) retrieveDesiredInternal(logger lager.Logger, event models.Even } if w.routeHandler.ShouldRefreshDesired(actualLRP) || (syncing && !foundInCurrentDesireds(actualLRP.ProcessGuid, currentDesireds)) { logger.Info("refreshing-desired-lrp-info", lager.Data{"process-guid": actualLRP.ProcessGuid}) - desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, models.DesiredLRPFilter{ + desiredLRPs, err = w.bbsClient.DesiredLRPs(logger, traceId, models.DesiredLRPFilter{ ProcessGuids: []string{actualLRP.ProcessGuid}, }) if err != nil { @@ -257,7 +260,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) { defer wg.Done() logger.Debug("getting-actual-lrps") var actualLRPs []*models.ActualLRP - actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, models.ActualLRPFilter{CellID: w.cellID}) + actualLRPs, actualErr = w.bbsClient.ActualLRPs(logger, "", models.ActualLRPFilter{CellID: w.cellID}) if actualErr != nil { logger.Error("failed-getting-actual-lrps", actualErr) return @@ -295,7 +298,7 @@ func (w *Watcher) sync(logger lager.Logger, ch chan<- *syncEventResult) { defer wg.Done() var domainArray []string logger.Debug("getting-domains") - domainArray, domainsErr = w.bbsClient.Domains(logger) + domainArray, domainsErr = w.bbsClient.Domains(logger, "") if domainsErr != nil { logger.Error("failed-getting-domains", domainsErr) return @@ -358,9 +361,9 @@ func getDesiredLRPs(logger lager.Logger, bbsClient bbs.Client, guids []string) ( logger.Debug("getting-desired-lrps-routing-info", lager.Data{"guids-length": len(guids)}) filter := models.DesiredLRPFilter{ProcessGuids: guids} - desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, filter) + desiredLRPs, err := bbsClient.DesiredLRPRoutingInfos(logger, "", filter) if err == bbs.EndpointNotFoundErr { - desiredLRPs, err = bbsClient.DesiredLRPs(logger, filter) + desiredLRPs, err = bbsClient.DesiredLRPs(logger, "", filter) } if err != nil { diff --git a/watcher/watcher_integration_test.go b/watcher/watcher_integration_test.go index 5952162..c1de2c5 100644 --- a/watcher/watcher_integration_test.go +++ b/watcher/watcher_integration_test.go @@ -103,7 +103,7 @@ var _ = Describe("Watcher Integration", func() { ) sendEvent := func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(removedActualLRP, "some-trace-id")})) Eventually(logger).Should(gbytes.Say("caching-event")) } @@ -167,7 +167,7 @@ var _ = Describe("Watcher Integration", func() { } } - bbsClient.ActualLRPsStub = func(logger lager.Logger, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(logger lager.Logger, traceId string, filter models.ActualLRPFilter) ([]*models.ActualLRP, error) { defer GinkgoRecover() sendEvent() @@ -177,7 +177,7 @@ var _ = Describe("Watcher Integration", func() { }, nil } - bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, traceId string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { defer GinkgoRecover() return []*models.DesiredLRP{desiredLRP1}, nil } diff --git a/watcher/watcher_test.go b/watcher/watcher_test.go index 179e988..10a90b2 100644 --- a/watcher/watcher_test.go +++ b/watcher/watcher_test.go @@ -153,7 +153,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { desiredLRP := getDesiredLRP("process-guid-1", "log-guid-1", 5222, 61000) - event = models.NewDesiredLRPCreatedEvent(desiredLRP) + event = models.NewDesiredLRPCreatedEvent(desiredLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -172,7 +172,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { beforeLRP := getDesiredLRP("process-guid-1", "log-guid-1", 5222, 61000) afterLRP := getDesiredLRP("process-guid-1", "log-guid-1", 5222, 61001) - event = models.NewDesiredLRPChangedEvent(beforeLRP, afterLRP) + event = models.NewDesiredLRPChangedEvent(beforeLRP, afterLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -190,7 +190,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { desiredLRP := getDesiredLRP("process-guid-1", "log-guid-1", 5222, 61000) - event = models.NewDesiredLRPRemovedEvent(desiredLRP) + event = models.NewDesiredLRPRemovedEvent(desiredLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -208,7 +208,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { actualLRP := getActualLRP("process-guid-1", "instance-guid-1", "some-ip", "container-ip", 61000, 5222, false) - event = models.NewActualLRPInstanceRemovedEvent(actualLRP) + event = models.NewActualLRPInstanceRemovedEvent(actualLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -226,7 +226,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { actualLRP := getActualLRP("process-guid-1", "instance-guid-1", "some-ip", "container-ip", 61000, 5222, false) - event = models.NewActualLRPInstanceCreatedEvent(actualLRP) + event = models.NewActualLRPInstanceCreatedEvent(actualLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -245,7 +245,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { beforeLRP := getActualLRP("process-guid-1", "instance-guid-1", "some-ip", "container-ip", 61000, 5222, false) afterLRP := getActualLRP("process-guid-1", "instance-guid-1", "some-ip", "container-ip", 61001, 5222, false) - event = models.NewActualLRPInstanceChangedEvent(beforeLRP, afterLRP) + event = models.NewActualLRPInstanceChangedEvent(beforeLRP, afterLRP, "some-trace-id") eventSource.NextReturns(event, nil) }) @@ -445,7 +445,7 @@ var _ = Describe("Watcher", func() { var sendEvent func() BeforeEach(func() { sendEvent = func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(actualLRP1)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceRemovedEvent(actualLRP1, "some-trace-id")})) } }) @@ -455,7 +455,7 @@ var _ = Describe("Watcher", func() { Describe("bbs events", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { defer GinkgoRecover() sendEvent() Eventually(logger).Should(gbytes.Say("caching-event")) @@ -471,14 +471,14 @@ var _ = Describe("Watcher", func() { Eventually(routeHandler.SyncCallCount).Should(Equal(1)) _, _, _, _, event := routeHandler.SyncArgsForCall(0) - expectedEvent := models.NewActualLRPInstanceRemovedEvent(actualLRP1) + expectedEvent := models.NewActualLRPInstanceRemovedEvent(actualLRP1, "some-trace-id") Expect(event[actualLRP1.InstanceGuid]).To(Equal(expectedEvent)) }) Context("when an invalid actual lrp created event is cached", func() { BeforeEach(func() { sendEvent = func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceCreatedEvent(nil)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceCreatedEvent(nil, "some-trace-id")})) } }) @@ -490,7 +490,7 @@ var _ = Describe("Watcher", func() { Context("when an invalid actual lrp change event is cached", func() { BeforeEach(func() { sendEvent = func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceChangedEvent(nil, nil)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceChangedEvent(nil, nil, "some-trace-id")})) } }) @@ -507,7 +507,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { unblock = make(chan struct{}) - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { <-unblock return nil, nil } @@ -535,7 +535,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { errCh = make(chan error, 1) errCh <- errors.New("bam") - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { return []*models.ActualLRP{}, <-errCh } }) @@ -555,7 +555,7 @@ var _ = Describe("Watcher", func() { Context("when one of the actual lrps is invalid", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { return []*models.ActualLRP{actualLRP1, &models.ActualLRP{}, actualLRP2}, nil } }) @@ -576,7 +576,7 @@ var _ = Describe("Watcher", func() { errCh = make(chan error, 1) errCh <- errors.New("bam") - bbsClient.DesiredLRPRoutingInfosStub = func(lager.Logger, models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(lager.Logger, string, models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { return []*models.DesiredLRP{}, <-errCh } }) @@ -603,7 +603,7 @@ var _ = Describe("Watcher", func() { errCh = make(chan error, 1) errCh <- errors.New("bam") - bbsClient.DomainsStub = func(lager.Logger) ([]string, error) { + bbsClient.DomainsStub = func(lager.Logger, string) ([]string, error) { return []string{}, <-errCh } }) @@ -627,7 +627,7 @@ var _ = Describe("Watcher", func() { Context("when the routing_info BBS endpoint is not there", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(logger lager.Logger, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(logger lager.Logger, traceId string, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { clock.IncrementBySeconds(1) return []*models.ActualLRP{ @@ -637,11 +637,11 @@ var _ = Describe("Watcher", func() { }, nil } - bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, traceId string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { return []*models.DesiredLRP{}, bbs.EndpointNotFoundErr } - bbsClient.DesiredLRPsStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPsStub = func(logger lager.Logger, traceId string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { return []*models.DesiredLRP{desiredLRP1, desiredLRP2}, nil } }) @@ -682,14 +682,15 @@ var _ = Describe("Watcher", func() { It("gets all the desired lrps", func() { Eventually(bbsClient.DesiredLRPRoutingInfosCallCount).Should(Equal(1)) Eventually(bbsClient.DesiredLRPsCallCount).Should(Equal(1)) - _, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + _, traceId, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + Expect(traceId).To(BeEmpty()) Expect(filter.ProcessGuids).To(BeEmpty()) }) }) Context("when desired lrps are retrieved", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(logger lager.Logger, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(logger lager.Logger, traceId string, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { clock.IncrementBySeconds(1) return []*models.ActualLRP{ @@ -699,7 +700,7 @@ var _ = Describe("Watcher", func() { }, nil } - bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(logger lager.Logger, traceID string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { defer GinkgoRecover() return []*models.DesiredLRP{desiredLRP1, desiredLRP2}, nil @@ -741,7 +742,8 @@ var _ = Describe("Watcher", func() { It("gets all the desired lrps", func() { Eventually(bbsClient.DesiredLRPRoutingInfosCallCount).Should(Equal(1)) - _, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + _, traceId, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + Expect(traceId).To(BeEmpty()) Expect(filter.ProcessGuids).To(BeEmpty()) }) }) @@ -754,7 +756,7 @@ var _ = Describe("Watcher", func() { Context("when the cell has actual lrps running", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { clock.IncrementBySeconds(1) return []*models.ActualLRP{ @@ -762,7 +764,7 @@ var _ = Describe("Watcher", func() { }, nil } - bbsClient.DesiredLRPRoutingInfosStub = func(lager.Logger, models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(lager.Logger, string, models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { return []*models.DesiredLRP{desiredLRP2}, nil } }) @@ -782,13 +784,15 @@ var _ = Describe("Watcher", func() { It("fetches actual lrps that match the cell id", func() { Eventually(bbsClient.ActualLRPsCallCount).Should(Equal(1)) - _, filter := bbsClient.ActualLRPsArgsForCall(0) + _, traceId, filter := bbsClient.ActualLRPsArgsForCall(0) + Expect(traceId).To(BeEmpty()) Expect(filter.CellID).To(Equal(cellID)) }) It("fetches desired lrp scheduling info that match the cell id", func() { Eventually(bbsClient.DesiredLRPRoutingInfosCallCount).Should(Equal(1)) - _, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + _, traceId, filter := bbsClient.DesiredLRPRoutingInfosArgsForCall(0) + Expect(traceId).To(BeEmpty()) Expect(filter.ProcessGuids).To(ConsistOf(actualLRP2.ProcessGuid)) }) }) @@ -804,10 +808,11 @@ var _ = Describe("Watcher", func() { Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceChangedEvent( beforeActualLRP3, actualLRP3, + "some-trace-id", )})) } - bbsClient.DesiredLRPsStub = func(_ lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPsStub = func(_ lager.Logger, _ string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { defer GinkgoRecover() if len(f.ProcessGuids) == 1 && f.ProcessGuids[0] == "pg-3" { return []*models.DesiredLRP{desiredLRP3}, nil @@ -822,7 +827,7 @@ var _ = Describe("Watcher", func() { BeforeEach(func() { cellID = "cell-id" - bbsClient.ActualLRPsStub = func(logger lager.Logger, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(logger lager.Logger, _ string, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { clock.IncrementBySeconds(1) return []*models.ActualLRP{actualLRP1}, nil } @@ -836,7 +841,7 @@ var _ = Describe("Watcher", func() { Context("when an invalid actual lrp created event is received", func() { BeforeEach(func() { sendEvent = func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceCreatedEvent(nil)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceCreatedEvent(nil, "some-trace-id")})) } }) @@ -848,7 +853,7 @@ var _ = Describe("Watcher", func() { Context("when an invalid actual lrp change event is received", func() { BeforeEach(func() { sendEvent = func() { - Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceChangedEvent(nil, nil)})) + Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceChangedEvent(nil, nil, "some-trace-id")})) } }) @@ -861,7 +866,8 @@ var _ = Describe("Watcher", func() { Eventually(bbsClient.DesiredLRPRoutingInfosCallCount).Should(Equal(1)) Eventually(bbsClient.DesiredLRPsCallCount).Should(Equal(1)) - _, filter := bbsClient.DesiredLRPsArgsForCall(0) + _, traceId, filter := bbsClient.DesiredLRPsArgsForCall(0) + Expect(traceId).To(Equal("some-trace-id")) Expect(filter.ProcessGuids).To(HaveLen(1)) Expect(filter.ProcessGuids).To(ConsistOf(actualLRP3.ProcessGuid)) @@ -877,7 +883,7 @@ var _ = Describe("Watcher", func() { Context("and the event is cached", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { clock.IncrementBySeconds(1) defer GinkgoRecover() sendEvent() @@ -890,7 +896,8 @@ var _ = Describe("Watcher", func() { Eventually(bbsClient.DesiredLRPRoutingInfosCallCount).Should(Equal(1)) Eventually(bbsClient.DesiredLRPsCallCount).Should(Equal(1)) - _, filter := bbsClient.DesiredLRPsArgsForCall(0) + _, traceId, filter := bbsClient.DesiredLRPsArgsForCall(0) + Expect(traceId).To(Equal("some-trace-id")) Expect(filter.ProcessGuids).To(HaveLen(1)) Expect(filter.ProcessGuids).To(ConsistOf(actualLRP3.ProcessGuid)) @@ -903,7 +910,7 @@ var _ = Describe("Watcher", func() { Context("and fetching desired scheduling info fails", func() { BeforeEach(func() { - bbsClient.DesiredLRPRoutingInfosStub = func(l lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(l lager.Logger, _ string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { defer GinkgoRecover() if len(f.ProcessGuids) == 1 && f.ProcessGuids[0] == "pg-3" { return nil, errors.New("boom!") @@ -923,13 +930,13 @@ var _ = Describe("Watcher", func() { Context("when fetching desired scheduling info fails", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(lager.Logger, models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(lager.Logger, string, models.ActualLRPFilter) ([]*models.ActualLRP, error) { defer GinkgoRecover() sendEvent() Eventually(logger).Should(gbytes.Say("caching-event")) return []*models.ActualLRP{actualLRP1}, nil } - bbsClient.DesiredLRPRoutingInfosStub = func(l lager.Logger, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { + bbsClient.DesiredLRPRoutingInfosStub = func(l lager.Logger, _ string, f models.DesiredLRPFilter) ([]*models.DesiredLRP, error) { defer GinkgoRecover() if len(f.ProcessGuids) == 1 && f.ProcessGuids[0] == "pg-3" { return nil, errors.New("boom!") @@ -949,7 +956,7 @@ var _ = Describe("Watcher", func() { Context("when there are no running actual lrps on the cell", func() { BeforeEach(func() { - bbsClient.ActualLRPsStub = func(logger lager.Logger, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { + bbsClient.ActualLRPsStub = func(logger lager.Logger, _ string, f models.ActualLRPFilter) ([]*models.ActualLRP, error) { return []*models.ActualLRP{}, nil } }) @@ -970,6 +977,7 @@ var _ = Describe("Watcher", func() { Eventually(eventCh).Should(BeSent(EventHolder{models.NewActualLRPInstanceCreatedEvent( actualLRP4, + "some-trace-id", )})) })