Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of [NET-4958] Fix issue where envoy endpoints would fail to populate after snapshot restore into release/1.16.x #18645

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/18636.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
connect: Fix issue where Envoy endpoints would not populate correctly after a snapshot restore.
```
2 changes: 1 addition & 1 deletion agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4568,7 +4568,7 @@ func (a *Agent) proxyDataSources() proxycfg.DataSources {
// interact with ACLs and the streaming backend. See comments in `proxycfgglue.ServerHealthBlocking`
// for more details.
// sources.Health = proxycfgglue.ServerHealth(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth), server.FSM().State())
sources.Health = proxycfgglue.ServerHealthBlocking(deps, proxycfgglue.ClientHealth(a.rpcClientHealth))
sources.HTTPChecks = proxycfgglue.ServerHTTPChecks(deps, a.config.NodeName, proxycfgglue.CacheHTTPChecks(a.cache), a.State)
sources.Intentions = proxycfgglue.ServerIntentions(deps)
sources.IntentionUpstreams = proxycfgglue.ServerIntentionUpstreams(deps)
Expand Down
3 changes: 3 additions & 0 deletions agent/proxycfg-glue/glue.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ type Store interface {
PeeringTrustBundleList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
TrustBundleListByService(ws memdb.WatchSet, service, dc string, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.PeeringTrustBundle, error)
VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []state.ServiceVirtualIP, error)
CheckConnectServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
CheckIngressServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta) (uint64, structs.CheckServiceNodes, error)
CheckServiceNodes(ws memdb.WatchSet, serviceName string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, structs.CheckServiceNodes, error)
}

// CacheCARoots satisfies the proxycfg.CARoots interface by sourcing data from
Expand Down
16 changes: 7 additions & 9 deletions agent/proxycfg-glue/health_blocking.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/hashicorp/go-memdb"

"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/watch"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
Expand All @@ -39,14 +38,13 @@ import (
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing.
// This means that while agents should technically have this same issue, they don't experience it with mesh health
// watches.
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute}
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health) *serverHealthBlocking {
return &serverHealthBlocking{deps, remoteSource, 5 * time.Minute}
}

type serverHealthBlocking struct {
deps ServerDataSourceDeps
remoteSource proxycfg.Health
state *state.Store
watchTimeout time.Duration
}

Expand All @@ -66,7 +64,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
}

// Determine the function we'll call
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
var f func(memdb.WatchSet, Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error)
switch {
case args.Connect:
f = serviceNodesConnect
Expand Down Expand Up @@ -115,7 +113,7 @@ func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.Service
}

var thisReply structs.IndexedCheckServiceNodes
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args)
thisReply.Index, thisReply.Nodes, err = f(ws, store, args)
if err != nil {
return 0, nil, err
}
Expand Down Expand Up @@ -151,14 +149,14 @@ func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token str
return nil
}

func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesConnect(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}

func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesIngress(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta)
}

func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
func serviceNodesDefault(ws memdb.WatchSet, s Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) {
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName)
}
5 changes: 2 additions & 3 deletions agent/proxycfg-glue/health_blocking_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ func TestServerHealthBlocking(t *testing.T) {
remoteSource := newMockHealth(t)
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result)

store := state.NewStateStore(nil)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store)
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource)
err := dataSource.Notify(ctx, req, correlationID, ch)
require.Equal(t, result, err)
})
Expand All @@ -49,7 +48,7 @@ func TestServerHealthBlocking(t *testing.T) {
Datacenter: datacenter,
ACLResolver: aclResolver,
Logger: testutil.Logger(t),
}, nil, store)
}, nil)
dataSource.watchTimeout = 1 * time.Second

// Watch for all events
Expand Down