Skip to content

Commit

Permalink
fix upstream status flicker and constant updates (kgateway-dev#10384)
Browse files Browse the repository at this point in the history
When only Kube GW proxies are present, we still rely on the edge translator_syncer for extension syncing.
The edge translator will mark Upstreams & UpstreamGroups as Accepted
then perform xds translation where status may be changed to e.g. Rejected if there is an error.

However, in the case where there are no edge proxies, 
translation doesn't actually occur, so any actual errors on the Upstream are never encountered,
thus the status is never set to Rejected.
We end up in a scenario where the Kube GW syncer (correctly) reports Rejected status
while the Edge syncer reports Accepted and they will fight each other indefinitely.

This changes the edge translator_syncer to no longer mark Upstream[Group]s as Accepted unless it will also perform translation.

track obj status in krt collections
    
the status reporter compares the desired status with the
existing status in the solo-kit object to determine if it
should actually UPDATE the resource.

the current proxy_syncer will do a once per second status sync
and relies on this status comparison to be functional to prevent
endless object UPDATEs.

this commit fixes the solo-kit objects (really wrappers) in the
krt collections to contain the status so an accurate comparison
can take place.
  • Loading branch information
lgadban authored Nov 25, 2024
1 parent aac1d58 commit 7af653c
Show file tree
Hide file tree
Showing 21 changed files with 181 additions and 66 deletions.
16 changes: 16 additions & 0 deletions changelog/v1.18.0-rc2/fix-us-status-flicker.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
changelog:
- type: NON_USER_FACING
description: >-
Fix flicker on Upstream status when kube gw syncer rejects a resource and no edge proxies are present
issueLink: https://github.com/solo-io/solo-projects/issues/7243
resolvesIssue: true
- type: NON_USER_FACING
description: >-
Fix missing status on krt objects resulting in continuous status updates (and webhook hits)
issueLink: https://github.com/solo-io/solo-projects/issues/7257
resolvesIssue: true
- type: BREAKING_CHANGE
description: >-
Upstreams and UpstreamGroups no longer get Accepted status by default. If they have not gone through translation they will have an empty status field.
issueLink: https://github.com/solo-io/gloo/issues/10401
resolvesIssue: true
2 changes: 2 additions & 0 deletions projects/gateway2/proxy_syncer/proxy_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ func (s *ProxySyncer) Init(ctx context.Context, dbg *krt.DebugHandler) error {
Namespace: u.GetNamespace(),
}
glooUs.SetMetadata(&md)
glooUs.NamespacedStatuses = &u.Status
us := &krtcollections.UpstreamWrapper{Inner: glooUs}
return us
}, krt.WithName("GlooUpstreams"), withDebug)
Expand Down Expand Up @@ -715,6 +716,7 @@ func (s *ProxySyncer) translateProxy(
Namespace: kac.GetNamespace(),
}
gac.SetMetadata(&md)
gac.NamespacedStatuses = &kac.Status
acfgs = append(acfgs, gac)
}
latestSnap.AuthConfigs = acfgs
Expand Down
5 changes: 4 additions & 1 deletion projects/gateway2/setup/ggv2setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,10 @@ func (g *genericStatusReporter) WriteReports(ctx context.Context, resourceErrs r
resourceStatus := g.statusClient.GetStatus(resource)

if status.Equal(resourceStatus) {
logger.Debugf("skipping report for %v as it has not changed", resource.GetMetadata().Ref())
// TODO: find a way to log this but it is noisy currently due to once per second status sync
// see: projects/gateway2/proxy_syncer/kube_gw_translator_syncer.go#syncStatus(...)
// and its call site in projects/gateway2/proxy_syncer/proxy_syncer.go
// logger.Debugf("skipping report for %v as it has not changed", resource.GetMetadata().Ref())
continue
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ func (p *plugin) ApplyStatusPlugin(ctx context.Context, statusCtx *plugins.Statu
roObj.Spec.GetMetadata().Name = roObj.GetName()
roObj.Spec.GetMetadata().Namespace = roObj.GetNamespace()
roObjSk := &roObj.Spec
roObjSk.NamespacedStatuses = &roObj.Status

// mark this object to be processed
routeOptionReport.Accept(roObjSk)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (p *plugin) ApplyStatusPlugin(ctx context.Context, statusCtx *plugins.Statu
vhOptObj.Spec.GetMetadata().Name = vhOptObj.GetName()
vhOptObj.Spec.GetMetadata().Namespace = vhOptObj.GetNamespace()
vhOptObjSk := &vhOptObj.Spec
vhOptObjSk.NamespacedStatuses = &vhOptObj.Status

// mark this object to be processed
virtualHostOptionReport.Accept(vhOptObjSk)
Expand Down
9 changes: 7 additions & 2 deletions projects/gloo/pkg/syncer/envoy_translator_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,17 @@ func (s *translatorSyncer) syncEnvoy(ctx context.Context, snap *v1snap.ApiSnapsh
}
}

allReports.Accept(snap.Upstreams.AsInputResources()...)
allReports.Accept(snap.UpstreamGroups.AsInputResources()...)
// Only mark non-kube gateways as accepted
// Regardless, kube gw proxies are filtered out of these reports before reporting in translator_syncer.go
allReports.Accept(nonKubeProxies.AsInputResources()...)

// mark Upstream[Group]s as Accepted initially, but only if we have at least 1 edge proxy;
// otherwise, we won't actually translate them, and so if there is an error, we will incorrectly report Accepted
if len(nonKubeProxies) > 0 {
allReports.Accept(snap.Upstreams.AsInputResources()...)
allReports.Accept(snap.UpstreamGroups.AsInputResources()...)
}

// sync non-kube gw proxies
for _, proxy := range nonKubeProxies {
proxyCtx := ctx
Expand Down
4 changes: 3 additions & 1 deletion test/e2e/aws_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,9 @@ var _ = Describe("AWS Lambda", func() {
Expect(err).NotTo(HaveOccurred())

// wait for the upstream to be created
helpers.EventuallyResourceAccepted(func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
helpers.EventuallyResourceExists(func() (resources.Resource, error) {
return testClients.UpstreamClient.Read(upstream.Metadata.Namespace, upstream.Metadata.Name, clients.ReadOpts{})
}, "30s", "1s")
}
Expand Down
13 changes: 13 additions & 0 deletions test/helpers/input_resources.go → test/helpers/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,19 @@ const (
defaultEventuallyPollingInterval = 1 * time.Second
)

type ResourceGetter func() (resources.Resource, error)

func EventuallyResourceExists(getter ResourceGetter, intervals ...interface{}) {
timeoutInterval, pollingInterval := getTimeoutAndPollingIntervalsOrDefault(intervals...)
gomega.Eventually(func() (bool, error) {
_, err := getter()
if err != nil {
return false, err
}
return true, nil
}, timeoutInterval, pollingInterval).Should(gomega.BeTrue())
}

type InputResourceGetter func() (resources.InputResource, error)
type InputResourceListGetter func() (resources.InputResourceList, error)

Expand Down
4 changes: 3 additions & 1 deletion test/kube2e/gateway/gateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1290,7 +1290,9 @@ var _ = Describe("Kube2e: gateway", func() {

upstreamName = kubernetesplugin.UpstreamName(testHelper.InstallNamespace, service.Name, 5678)
// wait for upstream to get created by discovery
helpers.EventuallyResourceAccepted(func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
helpers.EventuallyResourceExists(func() (resources.Resource, error) {
return resourceClientset.UpstreamClient().Read(testHelper.InstallNamespace, upstreamName, clients.ReadOpts{Ctx: ctx})
})
// add subset spec to upstream
Expand Down
10 changes: 7 additions & 3 deletions test/kube2e/gloo/happypath_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,9 @@ var _ = Describe("Happy path", func() {
err := envoyInstance.RunWithRole(role, testClients.GlooPort)
Expect(err).NotTo(HaveOccurred())

testhelpers.EventuallyResourceAccepted(func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
testhelpers.EventuallyResourceExists(func() (resources.Resource, error) {
return getUpstream()
}, "20s", ".5s")
})
Expand Down Expand Up @@ -239,9 +241,11 @@ var _ = Describe("Happy path", func() {
})

It("watch all namespaces", func() {
testhelpers.EventuallyResourceAccepted(func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
testhelpers.EventuallyResourceExists(func() (resources.Resource, error) {
return getUpstream()
})
}, "20s", ".5s")

up, err := getUpstream()
Expect(err).NotTo(HaveOccurred())
Expand Down
15 changes: 11 additions & 4 deletions test/kubernetes/e2e/features/basicrouting/edge_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,21 @@ func (s *edgeBasicRoutingSuite) TestBasicVirtualServiceRouting() {
})

// Upstream is only rejected when the upstream plugin is run when a valid cluster is present
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, ossvalidation.ExampleUpstream, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err, "can apply valid upstream")
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, ossvalidation.ExampleUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
defaults.GlooReporter,
)
// we need to make sure Gloo has had a chance to process it
s.testInstallation.Assertions.ConsistentlyResourceExists(
s.ctx,
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, "nginx-upstream", clients.ReadOpts{Ctx: s.ctx})
},
)
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, ossvalidation.ExampleVS, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err, "can apply valid virtual service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,9 @@ import (
"github.com/solo-io/gloo/projects/gloo/pkg/plugins/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

"github.com/solo-io/gloo/projects/gloo/pkg/defaults"
"github.com/solo-io/gloo/test/kubernetes/e2e"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -64,13 +62,13 @@ func (s *discoveryWatchlabelsSuite) TestDiscoverUpstreamMatchingWatchLabels() {
s.Assert().NoError(err, "can apply service")

// eventually an Upstream should be created for the Service with matching labels
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
labeledUsName := kubernetes.UpstreamName(s.testInstallation.Metadata.InstallNamespace, "example-svc", 8000)
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, labeledUsName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
defaults.GlooReporter,
)

// the Upstream should have DiscoveryMetadata labels matching the parent Service
Expand Down Expand Up @@ -129,13 +127,13 @@ func (s *discoveryWatchlabelsSuite) TestDiscoverySpecPreserved() {
s.Assert().NoError(err, "can apply service")

// eventually an Upstream should be created for the Service with matching labels
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
labeledUsName := kubernetes.UpstreamName(s.testInstallation.Metadata.InstallNamespace, "example-svc", 8000)
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, labeledUsName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
defaults.GlooReporter,
)

// the Upstream should have DiscoveryMetadata labels matching the parent Service
Expand Down
8 changes: 4 additions & 4 deletions test/kubernetes/e2e/features/tracing/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,13 +101,13 @@ func (s *testingSuite) BeforeTest(string, string) {
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, tracingConfigManifest)
s.NoError(err, "can apply gloo tracing resources")
// accept the upstream
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(
otelcolUpstream.Namespace, otelcolUpstream.Name, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)
// accept the virtual service
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,11 @@ package full_envoy_validation
import (
"context"

gloo_defaults "github.com/solo-io/gloo/projects/gloo/pkg/defaults"
"github.com/solo-io/gloo/test/kubernetes/e2e"
testdefaults "github.com/solo-io/gloo/test/kubernetes/e2e/defaults"
"github.com/solo-io/gloo/test/kubernetes/e2e/features/validation"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"github.com/stretchr/testify/suite"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -54,14 +52,21 @@ func (s *testingSuite) TestRejectInvalidTransformation() {

err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.ExampleUpstream, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err)
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, "nginx-upstream", clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)

// we need to make sure Gloo has had a chance to process it
s.testInstallation.Assertions.ConsistentlyResourceExists(
s.ctx,
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, "nginx-upstream", clients.ReadOpts{Ctx: s.ctx})
},
)
s.T().Cleanup(func() {
err := s.testInstallation.Actions.Kubectl().DeleteFileSafe(s.ctx, validation.VSTransformationHeaderText, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err)
Expand Down
21 changes: 14 additions & 7 deletions test/kubernetes/e2e/features/validation/split_webhook/suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,11 @@ import (
"time"

"github.com/onsi/gomega"
gloo_defaults "github.com/solo-io/gloo/projects/gloo/pkg/defaults"
"github.com/solo-io/gloo/test/kubernetes/e2e"
"github.com/solo-io/gloo/test/kubernetes/e2e/features/validation"
"github.com/solo-io/gloo/test/kubernetes/testutils/helper"
"github.com/solo-io/solo-kit/pkg/api/v1/clients"
"github.com/solo-io/solo-kit/pkg/api/v1/resources"
"github.com/solo-io/solo-kit/pkg/api/v1/resources/core"
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -196,12 +194,21 @@ var (
}

validateUpstreamCreated = func(s *testingSuite) {
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, "json-upstream", clients.ReadOpts{Ctx: s.ctx})
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
uc := s.testInstallation.ResourceClients.UpstreamClient()
return uc.Read(s.testInstallation.Metadata.InstallNamespace, validation.SplitWebhookBasicUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
)
// we need to make sure Gloo has had a chance to process it
s.testInstallation.Assertions.ConsistentlyResourceExists(
s.ctx,
func() (resources.Resource, error) {
uc := s.testInstallation.ResourceClients.UpstreamClient()
return uc.Read(s.testInstallation.Metadata.InstallNamespace, validation.SplitWebhookBasicUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)
}

Expand Down
5 changes: 3 additions & 2 deletions test/kubernetes/e2e/features/validation/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import (
)

const (
ExampleVsName = "example-vs"
ExampleUpstreamName = "nginx-upstream"
ExampleVsName = "example-vs"
ExampleUpstreamName = "nginx-upstream"
SplitWebhookBasicUpstreamName = "json-upstream"

ValidVsName = "i-am-valid"
InvalidVsName = "i-am-invalid"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,12 +115,10 @@ func (s *testingSuite) TestInvalidUpstreamMissingPort() {
// Upstream is only rejected when the upstream plugin is run when a valid cluster is present
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.ExampleUpstream, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err, "can apply valid upstream")
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, validation.ExampleUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.ExampleVS, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err, "can apply valid virtual service")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,15 +161,14 @@ func (s *testingSuite) TestVirtualServiceWithSecretDeletion() {
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.UnusedSecret, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err)

// Upstream should be accepted
// Upstreams no longer report status if they have not been translated at all to avoid conflicting with
// other syncers that have translated them, so we can only detect that the objects exist here
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.ExampleUpstream, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err)
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, validation.ExampleUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)
// Apply VS with secret after Upstream and Secret exist
err = s.testInstallation.Actions.Kubectl().Apply(s.ctx, []byte(substitutedSecretVS))
Expand Down Expand Up @@ -246,12 +245,10 @@ func (s *testingSuite) TestPersistInvalidVirtualService() {
// First apply Upstream
err = s.testInstallation.Actions.Kubectl().ApplyFile(s.ctx, validation.ExampleUpstream, "-n", s.testInstallation.Metadata.InstallNamespace)
s.Assert().NoError(err, "can apply "+validation.ExampleUpstream)
s.testInstallation.Assertions.EventuallyResourceStatusMatchesState(
func() (resources.InputResource, error) {
s.testInstallation.Assertions.EventuallyResourceExists(
func() (resources.Resource, error) {
return s.testInstallation.ResourceClients.UpstreamClient().Read(s.testInstallation.Metadata.InstallNamespace, validation.ExampleUpstreamName, clients.ReadOpts{Ctx: s.ctx})
},
core.Status_Accepted,
gloo_defaults.GlooReporter,
)

// Then apply VirtualService
Expand Down
Loading

0 comments on commit 7af653c

Please sign in to comment.