Skip to content

Commit

Permalink
migration: shimmy in a cluster interface for testability
Browse files Browse the repository at this point in the history
To facilitate testing `Helper` in isolation, we introduce a `cluster`
interface that we'll mock out in tests. It's through this interface that
the migration infrastructure will able to dial out to a specific node,
grab hold of a kv.DB instance, and retrieve the current cluster
membership.

Part of diff also downgrades `RequiredNodes` from being a first class
primitive, instead tucking it away for internal usage only. Given
retrieving the cluster membership made no guarantees about new nodes
being added to the cluster, it's entirely possible for that to happen
concurrently with it. Appropriate usage then entailed wrapping it under
a stabilizing loop, like we do so in `EveryNode`. This tells us there's
no need to expose it directly to migration authors.

Release note: None
  • Loading branch information
irfansharif committed Dec 18, 2020
1 parent 932a815 commit 9f8500c
Show file tree
Hide file tree
Showing 7 changed files with 602 additions and 49 deletions.
25 changes: 24 additions & 1 deletion pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "migration",
Expand All @@ -19,8 +19,31 @@ go_library(
"//pkg/rpc/nodedialer",
"//pkg/server/serverpb",
"//pkg/sql",
"//pkg/sql/sqlutil",
"//pkg/util/log",
"//vendor/github.com/cockroachdb/errors",
"//vendor/github.com/cockroachdb/logtags",
"//vendor/github.com/cockroachdb/redact",
"//vendor/google.golang.org/grpc",
],
)

go_test(
name = "migration_test",
srcs = [
"helper_test.go",
"util_test.go",
],
embed = [":migration"],
deps = [
"//pkg/clusterversion",
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/server/serverpb",
"//pkg/testutils",
"//pkg/util/leaktest",
"//pkg/util/syncutil",
"//vendor/google.golang.org/grpc",
],
)
141 changes: 108 additions & 33 deletions pkg/migration/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,43 +13,58 @@ package migration
import (
"context"

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
"google.golang.org/grpc"
)

// Helper captures all the primitives required to fully specify a migration.
type Helper struct {
*Manager
c cluster
cv clusterversion.ClusterVersion
}

// RequiredNodeIDs returns the node IDs for all nodes that are currently part of
// the cluster (i.e. they haven't been decommissioned away). Migrations have the
// pre-requisite that all required nodes are up and running so that we're able
// to execute all relevant node-level operations on them. If any of the nodes
// are found to be unavailable, an error is returned.
func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) {
var nodeIDs []roachpb.NodeID
ls, err := h.nl.GetLivenessesFromKV(ctx)
if err != nil {
return nil, err
}
for _, l := range ls {
if l.Membership.Decommissioned() {
continue
}
live, err := h.nl.IsLive(l.NodeID)
if err != nil {
return nil, err
}
if !live {
return nil, errors.Newf("n%d required, but unavailable", l.NodeID)
}
nodeIDs = append(nodeIDs, l.NodeID)
}
return nodeIDs, nil
// cluster mediates access to the crdb cluster.
type cluster interface {
// nodes returns the IDs and epochs for all nodes that are currently part of
// the cluster (i.e. they haven't been decommissioned away). Migrations have
// the pre-requisite that all nodes are up and running so that we're able to
// execute all relevant node-level operations on them. If any of the nodes
// are found to be unavailable, an error is returned.
//
// It's important to note that this makes no guarantees about new nodes
// being added to the cluster. It's entirely possible for that to happen
// concurrently with the retrieval of the current set of nodes. Appropriate
// usage of this entails wrapping it under a stabilizing loop, like we do in
// EveryNode.
nodes(ctx context.Context) (nodes, error)

// dial returns a grpc connection to the given node.
dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error)

// db provides access the kv.DB instance backing the cluster.
//
// TODO(irfansharif): We could hide the kv.DB instance behind an interface
// to expose only relevant, vetted bits of kv.DB. It'll make our tests less
// "integration-ey".
db() *kv.DB

// executor provides access to an internal executor instance to run
// arbitrary SQL statements.
executor() sqlutil.InternalExecutor
}

func newHelper(c cluster, cv clusterversion.ClusterVersion) *Helper {
return &Helper{c: c, cv: cv}
}

// EveryNode invokes the given closure (named by the informational parameter op)
Expand Down Expand Up @@ -90,16 +105,17 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error)
func (h *Helper) EveryNode(
ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error,
) error {
nodeIDs, err := h.RequiredNodeIDs(ctx)
ns, err := h.c.nodes(ctx)
if err != nil {
return err
}

for {
// TODO(irfansharif): We can/should send out these RPCs in parallel.
log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs)
for _, nodeID := range nodeIDs {
conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass)
log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns)

for _, node := range ns {
conn, err := h.c.dial(ctx, node.id)
if err != nil {
return err
}
Expand All @@ -109,13 +125,14 @@ func (h *Helper) EveryNode(
}
}

curNodeIDs, err := h.RequiredNodeIDs(ctx)
curNodes, err := h.c.nodes(ctx)
if err != nil {
return err
}

if !identical(nodeIDs, curNodeIDs) {
nodeIDs = curNodeIDs
if ok, diff := ns.identical(curNodes); !ok {
log.Infof(ctx, "[%s], retrying", redact.Safe(diff))
ns = curNodes
continue
}

Expand All @@ -124,3 +141,61 @@ func (h *Helper) EveryNode(

return nil
}

// DB provides exposes the underlying *kv.DB instance.
func (h *Helper) DB() *kv.DB {
return h.c.db()
}

type clusterImpl struct {
nl nodeLiveness
exec sqlutil.InternalExecutor
dialer *nodedialer.Dialer
kvDB *kv.DB
}

var _ cluster = &clusterImpl{}

func newCluster(
nl nodeLiveness, dialer *nodedialer.Dialer, executor *sql.InternalExecutor, db *kv.DB,
) *clusterImpl {
return &clusterImpl{nl: nl, dialer: dialer, exec: executor, kvDB: db}
}

// nodes implements the cluster interface.
func (c *clusterImpl) nodes(ctx context.Context) (nodes, error) {
var ns []node
ls, err := c.nl.GetLivenessesFromKV(ctx)
if err != nil {
return nil, err
}
for _, l := range ls {
if l.Membership.Decommissioned() {
continue
}
live, err := c.nl.IsLive(l.NodeID)
if err != nil {
return nil, err
}
if !live {
return nil, errors.Newf("n%d required, but unavailable", l.NodeID)
}
ns = append(ns, node{id: l.NodeID, epoch: l.Epoch})
}
return ns, nil
}

// dial implements the cluster interface.
func (c *clusterImpl) dial(ctx context.Context, id roachpb.NodeID) (*grpc.ClientConn, error) {
return c.dialer.Dial(ctx, id, rpc.DefaultClass)
}

// db implements the cluster interface.
func (c *clusterImpl) db() *kv.DB {
return c.kvDB
}

// executor implements the cluster interface.
func (c *clusterImpl) executor() sqlutil.InternalExecutor {
return c.exec
}
Loading

0 comments on commit 9f8500c

Please sign in to comment.