diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 1efd725e9f86..e674c65e55a4 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -20,7 +20,9 @@ go_library( "//pkg/server/serverpb", "//pkg/sql", "//pkg/sql/sqlutil", + "//pkg/util/ctxgroup", "//pkg/util/log", + "//pkg/util/quotapool", "//vendor/github.com/cockroachdb/errors", "//vendor/github.com/cockroachdb/logtags", "//vendor/github.com/cockroachdb/redact", diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index 6ceeee1632d5..b7884b6b24e6 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -21,7 +21,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/serverpb" "github.com/cockroachdb/cockroach/pkg/sql" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/quotapool" "github.com/cockroachdb/errors" "github.com/cockroachdb/redact" "google.golang.org/grpc" @@ -110,19 +112,32 @@ func (h *Helper) EveryNode( return err } + // We'll want to rate limit outgoing RPCs (limit pulled out of thin air). + qp := quotapool.NewIntPool("every-node", 25) for { - // TODO(irfansharif): We can/should send out these RPCs in parallel. log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + grp := ctxgroup.WithContext(ctx) for _, node := range ns { - conn, err := h.c.dial(ctx, node.id) + id := node.id // copy out of the loop variable + alloc, err := qp.Acquire(ctx, 1) if err != nil { return err } - client := serverpb.NewMigrationClient(conn) - if err := fn(ctx, client); err != nil { - return err - } + + grp.GoCtx(func(ctx context.Context) error { + defer alloc.Release() + + conn, err := h.c.dial(ctx, id) + if err != nil { + return err + } + client := serverpb.NewMigrationClient(conn) + return fn(ctx, client) + }) + } + if err := grp.Wait(); err != nil { + return err } curNodes, err := h.c.nodes(ctx)