From f69c0f3cc125c8f23ea0c80990801baba2aa6bb2 Mon Sep 17 00:00:00 2001 From: Xinyi Wang Date: Fri, 30 Aug 2024 11:31:38 -0700 Subject: [PATCH] use error group to concurrently execute the tasks --- .../subcommand/sync-catalog/command.go | 38 +++++++++---------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/control-plane/subcommand/sync-catalog/command.go b/control-plane/subcommand/sync-catalog/command.go index 614327470a..d062da43e3 100644 --- a/control-plane/subcommand/sync-catalog/command.go +++ b/control-plane/subcommand/sync-catalog/command.go @@ -25,6 +25,7 @@ import ( "github.com/hashicorp/go-hclog" "github.com/mitchellh/cli" "github.com/prometheus/client_golang/prometheus/promhttp" + "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" @@ -466,9 +467,8 @@ func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) e return err } - var wg sync.WaitGroup + var firstErr error services := node.Services - errChan := make(chan error, 1) batchSize := 300 maxRetries := 2 retryDelay := 200 * time.Millisecond @@ -493,34 +493,32 @@ func (c *Command) removeAllK8SServicesFromConsulNode(consulClient *api.Client) e end = len(services) } - wg.Add(1) - go func(batch []*api.AgentService) { - defer wg.Done() - - for _, service := range batch { + var eg errgroup.Group + for _, service := range services[i:end] { + s := service + eg.Go(func() error { var b backoff.BackOff = backoff.NewConstantBackOff(retryDelay) b = backoff.WithMaxRetries(b, uint64(maxRetries)) - err := backoff.Retry(func() error { + return backoff.Retry(func() error { _, err := consulClient.Catalog().Deregister(&api.CatalogDeregistration{ Node: c.flagPurgeK8SServicesFromNode, - ServiceID: service.ID, + ServiceID: s.ID, }, nil) return err }, b) - if err != nil { - if len(errChan) == 0 { - errChan <- err - } - } + }) + } + if err := eg.Wait(); err != nil { + if firstErr == nil { + c.UI.Info("Some K8S services were not deregistered from Consul") + firstErr = err } - c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", len(batch), c.flagPurgeK8SServicesFromNode)) - }(services[i:end]) - wg.Wait() + } + c.UI.Info(fmt.Sprintf("Processed %v K8S services from %v", end-i, c.flagPurgeK8SServicesFromNode)) } - close(errChan) - if err = <-errChan; err != nil { - return err + if firstErr != nil { + return firstErr } c.UI.Info("All K8S services were deregistered from Consul") return nil