Skip to content

Commit

Permalink
Pass trace ID from event to bbs client
Browse files Browse the repository at this point in the history
Signed-off-by: Maria Shaldybin <mariash@vmware.com>
  • Loading branch information
notrepo05 authored and mariash committed May 4, 2023
1 parent df496f4 commit fb65ae7
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 75 deletions.
90 changes: 45 additions & 45 deletions cmd/route-emitter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -565,11 +565,11 @@ var _ = Describe("Route Emitter", func() {
Context("when an lrp is desired", func() {
BeforeEach(func() {
desiredLRP := getDesiredLRP("some-guid", routerGUID, 5222, 5222)
Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred())
Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred())
lrpKey := models.NewActualLRPKey("some-guid", 0, domain)
instanceKey := models.NewActualLRPInstanceKey("instance-guid", "cell-id")
netInfo := models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(62003, 5222))
Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
})

It("requests a token from the server", func() {
Expand Down Expand Up @@ -688,15 +688,15 @@ var _ = Describe("Route Emitter", func() {
})

JustBeforeEach(func() {
Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed())
Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed())
Eventually(blkChannel).Should(BeSent(struct{}{}))
Eventually(runner).Should(gbytes.Say("sync.complete"))
})

Context("then a desired lrp event is received", func() {
JustBeforeEach(func() {
Eventually(runner).Should(gbytes.Say("succeeded-getting-actual-lrps"))
Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred())
Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred())
Eventually(runner).Should(gbytes.Say("caching-event"))
Eventually(blkChannel).Should(BeSent(struct{}{}))
})
Expand All @@ -708,7 +708,7 @@ var _ = Describe("Route Emitter", func() {
lrpKey = models.NewActualLRPKey(processGUID, 0, domain)
instanceKey = models.NewActualLRPInstanceKey("instance-guid", "cell-id")
netInfo = models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(5222, 5222))
Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Eventually(runner).Should(gbytes.Say("caching-event"))

By("unblocking the sync loop")
Expand Down Expand Up @@ -753,7 +753,7 @@ var _ = Describe("Route Emitter", func() {
})

JustBeforeEach(func() {
Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred())
Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred())
})

Context("and an instance is started", func() {
Expand All @@ -764,7 +764,7 @@ var _ = Describe("Route Emitter", func() {
})

JustBeforeEach(func() {
Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
})

It("emits its routes immediately", func() {
Expand Down Expand Up @@ -858,7 +858,7 @@ var _ = Describe("Route Emitter", func() {
update := &models.DesiredLRPUpdate{
Routes: routes,
}
err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update)
err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update)
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -878,7 +878,7 @@ var _ = Describe("Route Emitter", func() {
update := &models.DesiredLRPUpdate{
Routes: &models.Routes{},
}
err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update)
err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -906,7 +906,7 @@ var _ = Describe("Route Emitter", func() {
ProcessGuid: expectedTCPProcessGUID,
Index: index,
}
err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey)
err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -945,15 +945,15 @@ var _ = Describe("Route Emitter", func() {
})

JustBeforeEach(func() {
Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed())
Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed())
})

It("should emit a route registration", func() {
By("waiting for the sync loop to start")
lrpKey = models.NewActualLRPKey(expectedTCPProcessGUID, 0, domain)
instanceKey = models.NewActualLRPInstanceKey("instance-guid", "cell-id")
netInfo = models.NewActualLRPNetInfo("some-ip", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(5222, 5222))
Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Eventually(runner).Should(gbytes.Say("caching-event"))

By("unblocking the sync loop")
Expand Down Expand Up @@ -984,12 +984,12 @@ var _ = Describe("Route Emitter", func() {
JustBeforeEach(func() {
ginkgomon.Kill(routingApiProcess, routingAPIInterruptTimeout)
desiredLRP := getDesiredLRP("some-guid-1", "some-guid", 1883, 1883)
Expect(bbsClient.DesireLRP(logger, &desiredLRP)).NotTo(HaveOccurred())
Expect(bbsClient.DesireLRP(logger, "", &desiredLRP)).NotTo(HaveOccurred())

key := models.NewActualLRPKey("some-guid-1", 0, domain)
instanceKey := models.NewActualLRPInstanceKey("instance-guid-1", "cell-id")
netInfo := models.NewActualLRPNetInfo("some-ip-1", "container-ip-1", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(62003, 1883))
Expect(bbsClient.StartActualLRP(logger, &key, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &key, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
})

It("starts an SSE connection to the bbs and continues to try to emit to routing api", func() {
Expand Down Expand Up @@ -1241,9 +1241,9 @@ var _ = Describe("Route Emitter", func() {
})

It("emits routes", func() {
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())
err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
var msg1, msg2 routingtable.RegistryMessage
Eventually(registeredRoutes).Should(Receive(&msg1))
Expand Down Expand Up @@ -1325,13 +1325,13 @@ var _ = Describe("Route Emitter", func() {

Context("and an lrp with routes is desired", func() {
BeforeEach(func() {
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())
})

Context("and an instance starts", func() {
JustBeforeEach(func() {
err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -1372,7 +1372,7 @@ var _ = Describe("Route Emitter", func() {
sqlRunner.Reset()

// Only start actual LRP, do not repopulate Desired
err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -1615,7 +1615,7 @@ var _ = Describe("Route Emitter", func() {
ProcessGuid: processGuid,
Index: index,
}
err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey)
err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey)
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -1628,10 +1628,10 @@ var _ = Describe("Route Emitter", func() {
Context("an actual lrp starts without a routed desired lrp", func() {
BeforeEach(func() {
desiredLRP.Routes = nil
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())

err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -1640,7 +1640,7 @@ var _ = Describe("Route Emitter", func() {
update := &models.DesiredLRPUpdate{
Routes: routes,
}
err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update)
err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -1789,10 +1789,10 @@ var _ = Describe("Route Emitter", func() {
Context("when an lrp with internal routes is desired and an instance starts", func() {
BeforeEach(func() {
desiredLRP.Routes = newInternalRoutes([]string{"foo1.bar", "foo2.bar"})
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())

err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})
It("does not emit any internal routes", func() {
Expand Down Expand Up @@ -1834,13 +1834,13 @@ var _ = Describe("Route Emitter", func() {

Context("and an lrp with routes is desired", func() {
BeforeEach(func() {
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())
})

Context("and an instance starts", func() {
JustBeforeEach(func() {
err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -1878,7 +1878,7 @@ var _ = Describe("Route Emitter", func() {
Eventually(runner).Should(gbytes.Say("succeeded-getting-desired-lrps"))

// Only start actual LRP, do not repopulate Desired
err := bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err := bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -1949,7 +1949,7 @@ var _ = Describe("Route Emitter", func() {
ProcessGuid: processGuid,
Index: index,
}
err := bbsClient.ClaimActualLRP(logger, &key, &instanceKey)
err := bbsClient.ClaimActualLRP(logger, "", &key, &instanceKey)
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -1962,10 +1962,10 @@ var _ = Describe("Route Emitter", func() {
Context("an actual lrp starts without a routed desired lrp", func() {
BeforeEach(func() {
desiredLRP.Routes = nil
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())

err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand All @@ -1974,7 +1974,7 @@ var _ = Describe("Route Emitter", func() {
update := &models.DesiredLRPUpdate{
Routes: routes,
}
err := bbsClient.UpdateDesiredLRP(logger, desiredLRP.ProcessGuid, update)
err := bbsClient.UpdateDesiredLRP(logger, "", desiredLRP.ProcessGuid, update)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -2065,10 +2065,10 @@ var _ = Describe("Route Emitter", func() {
var emitter ifrit.Process

BeforeEach(func() {
err := bbsClient.DesireLRP(logger, desiredLRP)
err := bbsClient.DesireLRP(logger, "", desiredLRP)
Expect(err).NotTo(HaveOccurred())

err = bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
err = bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -2131,7 +2131,7 @@ var _ = Describe("Route Emitter", func() {
updateRequest.SetInstances(desiredLRP.Instances)
updateRequest.SetAnnotation(desiredLRP.Annotation)

err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest)
err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest)
Expect(err).NotTo(HaveOccurred())
})

Expand Down Expand Up @@ -2205,12 +2205,12 @@ var _ = Describe("Route Emitter", func() {
}
updateRequest.SetInstances(desiredLRP.Instances)
updateRequest.SetAnnotation(desiredLRP.Annotation)
err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest)
err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest)
Expect(err).NotTo(HaveOccurred())
})

It("immediately emits router.unregister when domain is fresh", func() {
bbsClient.UpsertDomain(logger, domain, 2*time.Second)
bbsClient.UpsertDomain(logger, "", domain, 2*time.Second)
Eventually(unregisteredRoutes, msgReceiveTimeout).Should(Receive(
MatchRegistryMessage(expectedUnregistrationForRoute1),
))
Expand All @@ -2220,7 +2220,7 @@ var _ = Describe("Route Emitter", func() {
})

It("repeatedly sends unregistration messages specified in UnregistrationSendCount number of times", func() {
bbsClient.UpsertDomain(logger, domain, 2*time.Second)
bbsClient.UpsertDomain(logger, "", domain, 2*time.Second)
for i := 0; i < unregistrationSendCount+1; i++ {
Eventually(unregisteredRoutes, msgReceiveTimeout).Should(Receive(
MatchRegistryMessage(expectedUnregistrationForRoute1),
Expand Down Expand Up @@ -2266,7 +2266,7 @@ var _ = Describe("Route Emitter", func() {
}
updateRequest.SetInstances(desiredLRP.Instances)
updateRequest.SetAnnotation(desiredLRP.Annotation)
err := bbsClient.UpdateDesiredLRP(logger, processGuid, updateRequest)
err := bbsClient.UpdateDesiredLRP(logger, "", processGuid, updateRequest)
Expect(err).NotTo(HaveOccurred())

newDesiredLRP = &models.DesiredLRP{}
Expand All @@ -2277,7 +2277,7 @@ var _ = Describe("Route Emitter", func() {
})

It("sends unregistration messages unless there is re-registration", func() {
bbsClient.UpsertDomain(logger, domain, 2*time.Second)
bbsClient.UpsertDomain(logger, "", domain, 2*time.Second)

var receivedMessage routingtable.RegistryMessage
unregisteredRouteMessage := func() routingtable.RegistryMessage {
Expand All @@ -2289,12 +2289,12 @@ var _ = Describe("Route Emitter", func() {
MatchRegistryMessage(expectedUnregistrationForRoute1),
fmt.Sprintf("Failed to receive expected message, received: %#v, expected: %#v", receivedMessage, expectedUnregistrationForRoute1))
// this will re-register route-1 and route-2
err := bbsClient.DesireLRP(logger, newDesiredLRP)
err := bbsClient.DesireLRP(logger, "", newDesiredLRP)
Expect(err).NotTo(HaveOccurred())
lrpKey := models.NewActualLRPKey("some-other-guid", 0, domain)
instanceKey := models.NewActualLRPInstanceKey("instance-guid", "cell-id")
netInfo := models.NewActualLRPNetInfo("1.2.3.4", "container-ip", models.ActualLRPNetInfo_PreferredAddressHost, models.NewPortMapping(65100, 8080))
Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())

// keep reading unregistration messages until route-1 is re-registered
done := make(chan struct{})
Expand Down Expand Up @@ -2359,16 +2359,16 @@ var _ = Describe("Route Emitter", func() {

JustBeforeEach(func() {
runner = createEmitterRunner("route-emitter", "cell-id", cfgs...)
Expect(bbsClient.UpsertDomain(logger, domain, time.Hour)).To(Succeed())
Expect(bbsClient.DesireLRP(logger, desiredLRP)).To(Succeed())
Expect(bbsClient.UpsertDomain(logger, "", domain, time.Hour)).To(Succeed())
Expect(bbsClient.DesireLRP(logger, "", desiredLRP)).To(Succeed())
})

It("should refresh the desired lrp and emit a route registration", func() {
By("waiting for the sync loop to start")
runner.StartCheck = "succeeded-getting-actual-lrps"
emitter = ginkgomon.Invoke(runner)

Expect(bbsClient.StartActualLRP(logger, &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Expect(bbsClient.StartActualLRP(logger, "", &lrpKey, &instanceKey, &netInfo, []*models.ActualLRPInternalRoute{}, map[string]string{})).To(Succeed())
Eventually(runner).Should(gbytes.Say("caching-event"))

By("unblocking the sync loop")
Expand Down
4 changes: 2 additions & 2 deletions routingtable/nats_routing_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the added LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down Expand Up @@ -247,7 +247,7 @@ var _ = Describe("RoutingTable", func() {

It("should log the removed LRP net info", func() {
Expect(logger).To(Say(
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d,"host_tls_proxy_port":0}\]`,
`"address":"%s".*"ports":\[{"container_port":%d,"host_port":%d}\]`,
endpoint1.Host,
endpoint1.ContainerPort,
endpoint1.Port,
Expand Down
Loading

0 comments on commit fb65ae7

Please sign in to comment.