Skip to content

Commit

Permalink
migration: parallelize execution of the EveryNode primitive
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
irfansharif committed Dec 18, 2020
1 parent fe1ad64 commit 3c118de
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ go_library(
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/sqlutil",
"//pkg/util/ctxgroup",
"//pkg/util/log",
"//pkg/util/quotapool",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/cockroachdb/logtags",
"//vendor/github.com/cockroachdb/redact",
Expand Down
27 changes: 21 additions & 6 deletions pkg/migration/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
Expand Down Expand Up @@ -110,19 +112,32 @@ func (h *Helper) EveryNode(
return err
}

// We'll want to rate limit outgoing RPCs (limit pulled out of thin air).
qp := quotapool.NewIntPool("every-node", 25)
for {
// TODO(irfansharif): We can/should send out these RPCs in parallel.
log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns)

grp := ctxgroup.WithContext(ctx)
for _, node := range ns {
conn, err := h.c.dial(ctx, node.id)
id := node.id // copy out of the loop variable
alloc, err := qp.Acquire(ctx, 1)
if err != nil {
return err
}
client := serverpb.NewMigrationClient(conn)
if err := fn(ctx, client); err != nil {
return err
}

grp.GoCtx(func(ctx context.Context) error {
defer alloc.Release()

conn, err := h.c.dial(ctx, id)
if err != nil {
return err
}
client := serverpb.NewMigrationClient(conn)
return fn(ctx, client)
})
}
if err := grp.Wait(); err != nil {
return err
}

curNodes, err := h.c.nodes(ctx)
Expand Down

0 comments on commit 3c118de

Please sign in to comment.