Skip to content

Commit

Permalink
Merge branch 'main' into xw/NET-10571-de-register-service-command
Browse files Browse the repository at this point in the history
  • Loading branch information
wangxinyi7 committed Aug 30, 2024
2 parents f69c0f3 + 2ed5fd8 commit f7a8cf6
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 118 deletions.
3 changes: 3 additions & 0 deletions .changelog/4266.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
sync-catalog: fix infinite retry loop when the catalog fails to connect to consul-server during the sync process
```
3 changes: 3 additions & 0 deletions .changelog/4277.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
control-plane: add missing `$HOST_IP` environment variable to consul-dataplane sidecar containers
```
251 changes: 134 additions & 117 deletions control-plane/catalog/to-consul/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ import (
"github.com/armon/go-metrics/prometheus"
"github.com/cenkalti/backoff"
mapset "github.com/deckarep/golang-set"
"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"

"github.com/hashicorp/consul-k8s/control-plane/consul"
"github.com/hashicorp/consul-k8s/control-plane/namespaces"
)

var (
Expand Down Expand Up @@ -212,91 +213,91 @@ func (s *ConsulSyncer) watchReapableServices(ctx context.Context) {
// because we have no tracked services in our maps yet.
<-s.initialSync

// Run immediately the first time, then wait for the retry period
waitCh := time.After(0)
waitBeforeRetry := s.SyncPeriod / 4

for {
select {
case <-waitCh:
s.deregisterRemovedServices(ctx)
waitCh = time.After(waitBeforeRetry)
case <-ctx.Done():
return
}
}
}

// deregisterRemovedServices queries the Consul catalog for all services and
// schedules for deregistration any that no longer have a corresponding k8s
// service.
//
// This function is very similar to [deregisterRemovedService] but handles the case
// where the ServiceWatcher has been terminated but the service hasn't been deregistered
// yet.
func (s *ConsulSyncer) deregisterRemovedServices(ctx context.Context) {
opts := &api.QueryOptions{
AllowStale: true,
WaitIndex: 1,
WaitTime: 1 * time.Minute,
Filter: fmt.Sprintf("\"%s\" in Tags", s.ConsulK8STag),
}

if s.EnableNamespaces {
opts.Namespace = "*"
}

// minWait is the minimum time to wait between scheduling service deletes.
// This prevents a lot of churn in services causing high CPU usage.
minWait := s.SyncPeriod / 4
minWaitCh := time.After(0)
for {
// Create a new consul client.
consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
if err != nil {
s.Log.Error("failed to create Consul API client", "err", err)
return
}
consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
if err != nil {
s.Log.Error("failed to create Consul API client", "error", err)
return
}

var services *api.CatalogNodeServiceList
var meta *api.QueryMeta
err = backoff.Retry(func() error {
services, meta, err = consulClient.Catalog().NodeServiceList(s.ConsulNodeName, opts)
return err
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
// Limit our backoff so that we don't try forever with a bad client
b := backoff.WithContext(
backoff.WithMaxRetries(
backoff.NewExponentialBackOff(), 5), ctx)

var services *api.CatalogNodeServiceList
err = backoff.Retry(func() error {
services, _, err = consulClient.Catalog().NodeServiceList(s.ConsulNodeName, opts)
if err != nil {
s.Log.Warn("error querying services, will retry", "err", err)
} else {
s.Log.Debug("[watchReapableServices] services returned from catalog",
"services", services)
}

// Wait our minimum time before continuing or retrying
select {
case <-minWaitCh:
if err != nil {
continue
}

minWaitCh = time.After(minWait)
case <-ctx.Done():
return
s.Log.Warn("error querying services, will retry", "error", err)
return err
}

// Update our blocking index
opts.WaitIndex = meta.LastIndex

// Lock so we can modify the stored state
s.lock.Lock()
return nil
}, b)
if err != nil {
return
}

// Go through the service array and find services that should be reaped
for _, service := range services.Services {
// Check that the namespace exists in the valid service names map
// before checking whether it contains the service
svcNs := service.Namespace
if !s.EnableNamespaces {
// Set namespace to empty when namespaces are not enabled.
svcNs = ""
}
if _, ok := s.serviceNames[svcNs]; ok {
// We only care if we don't know about this service at all.
if s.serviceNames[svcNs].Contains(service.Service) {
s.Log.Debug("[watchReapableServices] serviceNames contains service",
"namespace", svcNs,
"service-name", service.Service)
continue
}
}
// Lock so we can modify the stored state
s.lock.Lock()
defer s.lock.Unlock()

s.Log.Info("invalid service found, scheduling for delete",
"service-name", service.Service, "service-id", service.ID, "service-consul-namespace", svcNs)
if err = s.scheduleReapServiceLocked(service.Service, svcNs); err != nil {
s.Log.Info("error querying service for delete",
"service-name", service.Service,
"service-consul-namespace", svcNs,
"err", err)
// Go through the service array and find services that should be reaped
for _, service := range services.Services {
// Check that the namespace exists in the valid service names map
// before checking whether it contains the service
namespace := service.Namespace
if !s.EnableNamespaces {
// Set namespace to empty when namespaces are not enabled.
namespace = ""
}
if _, ok := s.serviceNames[namespace]; ok {
// We only care if we don't know about this service at all.
if s.serviceNames[namespace].Contains(service.Service) {
continue
}
}

s.lock.Unlock()
s.Log.Info("invalid service found, scheduling for delete",
"service-name", service.Service, "service-id", service.ID, "service-consul-namespace", namespace)
if err = s.scheduleReapServiceLocked(service.Service, namespace); err != nil {
s.Log.Info("error querying service for delete",
"service-name", service.Service,
"service-consul-namespace", namespace,
"err", err)
}
}
}

Expand All @@ -306,72 +307,88 @@ func (s *ConsulSyncer) watchService(ctx context.Context, name, namespace string)
s.Log.Info("starting service watcher", "service-name", name, "service-consul-namespace", namespace)
defer s.Log.Info("stopping service watcher", "service-name", name, "service-consul-namespace", namespace)

// Run immediately the first time, then wait for the retry period
waitCh := time.After(0)
waitBeforeRetry := s.SyncPeriod / 4

for {
select {
// Wait for our poll period
case <-waitCh:
s.deregisterRemovedService(ctx, name, namespace)
waitCh = time.After(waitBeforeRetry)
// Quit if our context is over
case <-ctx.Done():
return

// Wait for our poll period
case <-time.After(s.SyncPeriod):
}

// Set up query options
queryOpts := &api.QueryOptions{
AllowStale: true,
}
if s.EnableNamespaces {
// Sets the Consul namespace to query the catalog
queryOpts.Namespace = namespace
}
}
}

// deregisterRemovedService checks to see if a given service in the catalog
// has been removed from k8s. If it has, then the service is deregistered from
// the Consul catalog.
//
// This function is very similar to [deregisterRemovedServices] but is scoped to a single
// service that is currently being watched.
func (s *ConsulSyncer) deregisterRemovedService(ctx context.Context, name, namespace string) {
opts := &api.QueryOptions{
AllowStale: true,
}
if s.EnableNamespaces {
opts.Namespace = namespace
}

consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
if err != nil {
s.Log.Error("failed to create Consul API client; will retry", "err", err)
return
}

// Limit our backoff so that we don't try forever with a bad client
b := backoff.WithContext(
backoff.WithMaxRetries(
backoff.NewExponentialBackOff(), 5), ctx)

// Create a new consul client.
consulClient, err := consul.NewClientFromConnMgr(s.ConsulClientConfig, s.ConsulServerConnMgr)
var services []*api.CatalogService
err = backoff.Retry(func() error {
services, _, err = consulClient.Catalog().Service(name, s.ConsulK8STag, opts)
if err != nil {
s.Log.Error("failed to create Consul API client; will retry", "err", err)
continue
}
// Wait for service changes
var services []*api.CatalogService
err = backoff.Retry(func() error {
services, _, err = consulClient.Catalog().Service(name, s.ConsulK8STag, queryOpts)
s.Log.Warn("error querying service, will retry", "error", err)
return err
}, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
if err != nil {
s.Log.Warn("error querying service, will retry",
"service-name", name,
"service-namespace", namespace, // will be "" if namespaces aren't enabled
"err", err)
continue
}

// Lock so we can modify the set of actions to take
s.lock.Lock()
return nil
}, b)
if err != nil {
return
}

for _, svc := range services {
// Make sure the namespace exists before we run checks against it
if _, ok := s.serviceNames[namespace]; ok {
// If the service is valid and its info isn't nil, we don't deregister it
if s.serviceNames[namespace].Contains(svc.ServiceName) && s.namespaces[namespace][svc.ServiceID] != nil {
continue
}
}
// Lock so we can modify the set of actions to take
s.lock.Lock()
defer s.lock.Unlock()

s.deregs[svc.ServiceID] = &api.CatalogDeregistration{
Node: svc.Node,
ServiceID: svc.ServiceID,
}
if s.EnableNamespaces {
s.deregs[svc.ServiceID].Namespace = namespace
for _, service := range services {
// Make sure the namespace exists before we run checks against it
if _, ok := s.serviceNames[namespace]; ok {
// If the service is valid and its info isn't nil, we don't deregister it
if s.serviceNames[namespace].Contains(service.ServiceName) && s.namespaces[namespace][service.ServiceID] != nil {
continue
}
s.Log.Debug("[watchService] service being scheduled for deregistration",
"namespace", namespace,
"service name", svc.ServiceName,
"service id", svc.ServiceID,
"service dereg", s.deregs[svc.ServiceID])
}

s.lock.Unlock()
s.deregs[service.ServiceID] = &api.CatalogDeregistration{
Node: service.Node,
ServiceID: service.ServiceID,
}
if s.EnableNamespaces {
s.deregs[service.ServiceID].Namespace = namespace
}
s.Log.Debug("[watchService] service being scheduled for deregistration",
"namespace", namespace,
"service name", service.ServiceName,
"service id", service.ServiceID,
"service dereg", s.deregs[service.ServiceID])
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ func (w *MeshWebhook) consulDataplaneSidecar(namespace corev1.Namespace, pod cor
Name: "DP_CREDENTIAL_LOGIN_META2",
Value: "pod-uid=$(POD_UID)",
},
{
Name: "HOST_IP",
ValueFrom: &corev1.EnvVarSource{
FieldRef: &corev1.ObjectFieldSelector{FieldPath: "status.hostIP"},
},
},
},
VolumeMounts: []corev1.VolumeMount{
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ func TestHandlerConsulDataplaneSidecar(t *testing.T) {
}
require.Equal(t, expectedProbe, container.ReadinessProbe)
require.Nil(t, container.StartupProbe)
require.Len(t, container.Env, 9)
require.Len(t, container.Env, 10)
require.Equal(t, container.Env[0].Name, "TMPDIR")
require.Equal(t, container.Env[0].Value, "/consul/connect-inject")
require.Equal(t, container.Env[2].Name, "DP_SERVICE_NODE_NAME")
Expand All @@ -235,6 +235,7 @@ func TestHandlerConsulDataplaneSidecar(t *testing.T) {
require.Equal(t, container.Env[7].Value, "pod=$(POD_NAMESPACE)/$(POD_NAME)")
require.Equal(t, container.Env[8].Name, "DP_CREDENTIAL_LOGIN_META2")
require.Equal(t, container.Env[8].Value, "pod-uid=$(POD_UID)")
require.Equal(t, container.Env[9].Name, "HOST_IP")
})
}
}
Expand Down

0 comments on commit f7a8cf6

Please sign in to comment.