diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel index 24976336f400..1efd725e9f86 100644 --- a/pkg/migration/BUILD.bazel +++ b/pkg/migration/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "migration", @@ -19,8 +19,31 @@ go_library( "//pkg/rpc/nodedialer", "//pkg/server/serverpb", "//pkg/sql", + "//pkg/sql/sqlutil", "//pkg/util/log", "//vendor/github.com/cockroachdb/errors", "//vendor/github.com/cockroachdb/logtags", + "//vendor/github.com/cockroachdb/redact", + "//vendor/google.golang.org/grpc", + ], +) + +go_test( + name = "migration_test", + srcs = [ + "helper_test.go", + "util_test.go", + ], + embed = [":migration"], + deps = [ + "//pkg/clusterversion", + "//pkg/kv", + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + "//pkg/server/serverpb", + "//pkg/testutils", + "//pkg/util/leaktest", + "//pkg/util/syncutil", + "//vendor/google.golang.org/grpc", ], ) diff --git a/pkg/migration/helper.go b/pkg/migration/helper.go index 5632d40e9b23..6ceeee1632d5 100644 --- a/pkg/migration/helper.go +++ b/pkg/migration/helper.go @@ -13,43 +13,58 @@ package migration import ( "context" + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/errors" + "github.com/cockroachdb/redact" + "google.golang.org/grpc" ) // Helper captures all the primitives required to fully specify a migration. type Helper struct { - *Manager + c cluster + cv clusterversion.ClusterVersion } -// RequiredNodeIDs returns the node IDs for all nodes that are currently part of -// the cluster (i.e. they haven't been decommissioned away). Migrations have the -// pre-requisite that all required nodes are up and running so that we're able -// to execute all relevant node-level operations on them. If any of the nodes -// are found to be unavailable, an error is returned. -func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) { - var nodeIDs []roachpb.NodeID - ls, err := h.nl.GetLivenessesFromKV(ctx) - if err != nil { - return nil, err - } - for _, l := range ls { - if l.Membership.Decommissioned() { - continue - } - live, err := h.nl.IsLive(l.NodeID) - if err != nil { - return nil, err - } - if !live { - return nil, errors.Newf("n%d required, but unavailable", l.NodeID) - } - nodeIDs = append(nodeIDs, l.NodeID) - } - return nodeIDs, nil +// cluster mediates access to the crdb cluster. +type cluster interface { + // nodes returns the IDs and epochs for all nodes that are currently part of + // the cluster (i.e. they haven't been decommissioned away). Migrations have + // the pre-requisite that all nodes are up and running so that we're able to + // execute all relevant node-level operations on them. If any of the nodes + // are found to be unavailable, an error is returned. + // + // It's important to note that this makes no guarantees about new nodes + // being added to the cluster. It's entirely possible for that to happen + // concurrently with the retrieval of the current set of nodes. Appropriate + // usage of this entails wrapping it under a stabilizing loop, like we do in + // EveryNode. + nodes(ctx context.Context) (nodes, error) + + // dial returns a grpc connection to the given node. + dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) + + // db provides access the kv.DB instance backing the cluster. + // + // TODO(irfansharif): We could hide the kv.DB instance behind an interface + // to expose only relevant, vetted bits of kv.DB. It'll make our tests less + // "integration-ey". + db() *kv.DB + + // executor provides access to an internal executor instance to run + // arbitrary SQL statements. + executor() sqlutil.InternalExecutor +} + +func newHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { + return &Helper{c: c, cv: cv} } // EveryNode invokes the given closure (named by the informational parameter op) @@ -90,16 +105,17 @@ func (h *Helper) RequiredNodeIDs(ctx context.Context) ([]roachpb.NodeID, error) func (h *Helper) EveryNode( ctx context.Context, op string, fn func(context.Context, serverpb.MigrationClient) error, ) error { - nodeIDs, err := h.RequiredNodeIDs(ctx) + ns, err := h.c.nodes(ctx) if err != nil { return err } for { // TODO(irfansharif): We can/should send out these RPCs in parallel. - log.Infof(ctx, "executing op=%s on nodes=%s", op, nodeIDs) - for _, nodeID := range nodeIDs { - conn, err := h.dialer.Dial(ctx, nodeID, rpc.DefaultClass) + log.Infof(ctx, "executing %s on nodes %s", redact.Safe(op), ns) + + for _, node := range ns { + conn, err := h.c.dial(ctx, node.id) if err != nil { return err } @@ -109,13 +125,14 @@ func (h *Helper) EveryNode( } } - curNodeIDs, err := h.RequiredNodeIDs(ctx) + curNodes, err := h.c.nodes(ctx) if err != nil { return err } - if !identical(nodeIDs, curNodeIDs) { - nodeIDs = curNodeIDs + if ok, diff := ns.identical(curNodes); !ok { + log.Infof(ctx, "[%s], retrying", redact.Safe(diff)) + ns = curNodes continue } @@ -124,3 +141,61 @@ func (h *Helper) EveryNode( return nil } + +// DB provides exposes the underlying *kv.DB instance. +func (h *Helper) DB() *kv.DB { + return h.c.db() +} + +type clusterImpl struct { + nl nodeLiveness + exec sqlutil.InternalExecutor + dialer *nodedialer.Dialer + kvDB *kv.DB +} + +var _ cluster = &clusterImpl{} + +func newCluster( + nl nodeLiveness, dialer *nodedialer.Dialer, executor *sql.InternalExecutor, db *kv.DB, +) *clusterImpl { + return &clusterImpl{nl: nl, dialer: dialer, exec: executor, kvDB: db} +} + +// nodes implements the cluster interface. +func (c *clusterImpl) nodes(ctx context.Context) (nodes, error) { + var ns []node + ls, err := c.nl.GetLivenessesFromKV(ctx) + if err != nil { + return nil, err + } + for _, l := range ls { + if l.Membership.Decommissioned() { + continue + } + live, err := c.nl.IsLive(l.NodeID) + if err != nil { + return nil, err + } + if !live { + return nil, errors.Newf("n%d required, but unavailable", l.NodeID) + } + ns = append(ns, node{id: l.NodeID, epoch: l.Epoch}) + } + return ns, nil +} + +// dial implements the cluster interface. +func (c *clusterImpl) dial(ctx context.Context, id roachpb.NodeID) (*grpc.ClientConn, error) { + return c.dialer.Dial(ctx, id, rpc.DefaultClass) +} + +// db implements the cluster interface. +func (c *clusterImpl) db() *kv.DB { + return c.kvDB +} + +// executor implements the cluster interface. +func (c *clusterImpl) executor() sqlutil.InternalExecutor { + return c.exec +} diff --git a/pkg/migration/helper_test.go b/pkg/migration/helper_test.go new file mode 100644 index 000000000000..54d52f51f66d --- /dev/null +++ b/pkg/migration/helper_test.go @@ -0,0 +1,307 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/clusterversion" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server/serverpb" + "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "google.golang.org/grpc" +) + +func TestHelperEveryNode(t *testing.T) { + defer leaktest.AfterTest(t) + + cv := clusterversion.ClusterVersion{} + ctx := context.Background() + var mu syncutil.Mutex + const numNodes = 3 + + t.Run("with-node-addition", func(t *testing.T) { + // Add a node mid-way through execution. We expect EveryNode to start + // over from scratch and include the newly added node. + tc := TestingNewCluster(numNodes) + h := newHelper(tc, cv) + opCount := 0 + err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.addNode() + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes*2 + 1; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-restart", func(t *testing.T) { + // Restart a node mid-way through execution. We expect EveryNode to + // start over from scratch and include the restarted node. + tc := TestingNewCluster(numNodes) + h := newHelper(tc, cv) + opCount := 0 + err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == numNodes { + tc.restartNode(2) + } + + return nil + }) + if err != nil { + t.Fatal(err) + } + + if exp := numNodes * 2; exp != opCount { + t.Fatalf("expected closure to be invoked %d times, got %d", exp, opCount) + } + }) + + t.Run("with-node-downNode", func(t *testing.T) { + // Down a node mid-way through execution. We expect EveryNode to error + // out. + const downedNode = 2 + tc := TestingNewCluster(numNodes) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + h := newHelper(tc, cv) + opCount := 0 + if err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + mu.Lock() + defer mu.Unlock() + + opCount++ + if opCount == 1 { + tc.downNode(downedNode) + } + return nil + }); !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + + tc.restartNode(downedNode) + if err := h.EveryNode(ctx, "dummy-op", func(context.Context, serverpb.MigrationClient) error { + return nil + }); err != nil { + t.Fatal(err) + } + }) +} + +func TestClusterNodes(t *testing.T) { + defer leaktest.AfterTest(t) + + ctx := context.Background() + const numNodes = 3 + + t.Run("retrieves-all", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + + ns, err := c.nodes(ctx) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes { + t.Fatalf("expected %d nodes, got %d", numNodes, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].id { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) + } + if ns[i].epoch != 1 { + t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) + } + } + }) + + t.Run("ignores-decommissioned", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + const decommissionedNode = 3 + nl.decommission(decommissionedNode) + + ns, err := c.nodes(ctx) + if err != nil { + t.Fatal(err) + } + + if got := len(ns); got != numNodes-1 { + t.Fatalf("expected %d nodes, got %d", numNodes-1, got) + } + + for i := range ns { + if exp := roachpb.NodeID(i + 1); exp != ns[i].id { + t.Fatalf("expected to find node ID %s, got %s", exp, ns[i].id) + } + if ns[i].epoch != 1 { + t.Fatalf("expected to find epoch=1, got %d", ns[i].epoch) + } + } + }) + + t.Run("errors-if-down", func(t *testing.T) { + nl := newTestNodeLiveness(numNodes) + c := clusterImpl{nl: nl} + const downedNode = 3 + nl.downNode(downedNode) + + _, err := c.nodes(ctx) + expRe := fmt.Sprintf("n%d required, but unavailable", downedNode) + if !testutils.IsError(err, expRe) { + t.Fatalf("expected error %q, got %q", expRe, err) + } + }) +} + +// mockClusterImpl is a testing only implementation of the cluster interface. It +// lets callers mock out adding, killing, and restarting nodes in the cluster. +type mockClusterImpl struct { + nl *mockNodeLivenessImpl + *clusterImpl +} + +var _ cluster = &mockClusterImpl{} + +// TestingNewCluster is an exported a constructor for a test-only implementation +// of the cluster interface. +func TestingNewCluster(numNodes int, options ...func(*mockClusterImpl)) *mockClusterImpl { + nl := newTestNodeLiveness(numNodes) + tc := &mockClusterImpl{ + nl: nl, + clusterImpl: newCluster(nl, nil, nil, nil), + } + for _, option := range options { + option(tc) + } + return tc +} + +// TestingWithKV facilitates the creation of a test cluster backed by the given +// KV instance. +func TestingWithKV(db *kv.DB) func(*mockClusterImpl) { + return func(impl *mockClusterImpl) { + impl.clusterImpl.kvDB = db + } +} + +// dial is part of the cluster interface. We override it here as tests don't +// expect to make any outbound requests. +func (t *mockClusterImpl) dial(context.Context, roachpb.NodeID) (*grpc.ClientConn, error) { + return nil, nil +} + +func (t *mockClusterImpl) addNode() { + t.nl.addNode(roachpb.NodeID(len(t.nl.ls) + 1)) +} + +func (t *mockClusterImpl) downNode(id roachpb.NodeID) { + t.nl.downNode(id) +} + +func (t *mockClusterImpl) restartNode(id roachpb.NodeID) { + t.nl.restartNode(id) +} + +// mockNodeLivenessImpl is a testing-only implementation of the nodeLiveness. It +// lets tests mock out restarting, killing, decommissioning and adding nodes to +// the cluster. +type mockNodeLivenessImpl struct { + ls []livenesspb.Liveness + dead map[roachpb.NodeID]struct{} +} + +var _ nodeLiveness = &mockNodeLivenessImpl{} + +func newTestNodeLiveness(numNodes int) *mockNodeLivenessImpl { + nl := &mockNodeLivenessImpl{ + ls: make([]livenesspb.Liveness, numNodes), + dead: make(map[roachpb.NodeID]struct{}), + } + for i := 0; i < numNodes; i++ { + nl.ls[i] = livenesspb.Liveness{ + NodeID: roachpb.NodeID(i + 1), Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + } + } + return nl +} + +// GetLivenessesFromKV implements the nodeLiveness interface. +func (t *mockNodeLivenessImpl) GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) { + return t.ls, nil +} + +// IsLive implements the nodeLiveness interface. +func (t *mockNodeLivenessImpl) IsLive(id roachpb.NodeID) (bool, error) { + _, dead := t.dead[id] + return !dead, nil +} + +func (t *mockNodeLivenessImpl) decommission(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Membership = livenesspb.MembershipStatus_DECOMMISSIONED + break + } + } +} + +func (t *mockNodeLivenessImpl) addNode(id roachpb.NodeID) { + t.ls = append(t.ls, livenesspb.Liveness{ + NodeID: id, + Epoch: 1, + Membership: livenesspb.MembershipStatus_ACTIVE, + }) +} + +func (t *mockNodeLivenessImpl) downNode(id roachpb.NodeID) { + t.dead[id] = struct{}{} +} + +func (t *mockNodeLivenessImpl) restartNode(id roachpb.NodeID) { + for i := range t.ls { + if t.ls[i].NodeID == id { + t.ls[i].Epoch++ + break + } + } + + delete(t.dead, id) +} + +// TestingNewHelper is an exported a constructor for Helper for testing +// purposes. +func TestingNewHelper(c cluster, cv clusterversion.ClusterVersion) *Helper { + return &Helper{c: c, cv: cv} +} diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go index cc129feb3c00..99686c0f21d4 100644 --- a/pkg/migration/manager.go +++ b/pkg/migration/manager.go @@ -99,7 +99,8 @@ func (m *Manager) Migrate(ctx context.Context, from, to clusterversion.ClusterVe log.Infof(ctx, "migrating cluster from %s to %s (stepping through %s)", from, to, clusterVersions) for _, clusterVersion := range clusterVersions { - h := &Helper{Manager: m} + cluster := newCluster(m.nl, m.dialer, m.executor, m.db) + h := newHelper(cluster, clusterVersion) // Push out the version gate to every node in the cluster. Each node // will persist the version, bump the local version gates, and then diff --git a/pkg/migration/util.go b/pkg/migration/util.go index b7ab5ed6c13e..a7488be750c4 100644 --- a/pkg/migration/util.go +++ b/pkg/migration/util.go @@ -12,31 +12,84 @@ package migration import ( "context" + "fmt" "sort" + "strings" "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/redact" ) -// identical returns whether or not two lists of node IDs are identical as sets. -func identical(a, b []roachpb.NodeID) bool { - if len(a) != len(b) { - return false +// node captures the relevant bits of each node as it pertains to the migration +// infrastructure. +type node struct { + id roachpb.NodeID + epoch int64 +} + +// nodes is a collection of node objects. +type nodes []node + +// identical returns whether or not two lists of nodes are identical as sets, +// and if not, what changed (in terms of cluster membership operations and epoch +// changes). The textual diff is only to be used for logging purposes. +func (ns nodes) identical(other nodes) (ok bool, diff string) { + a, b := ns, other + + type ent struct { + node node + count int + epochChanged bool + } + m := map[roachpb.NodeID]ent{} + for _, node := range a { + m[node.id] = ent{count: 1, node: node, epochChanged: false} + } + for _, node := range b { + e, ok := m[node.id] + e.count-- + if ok && e.node.epoch != node.epoch { + e.epochChanged = true + } + m[node.id] = e } - sort.Slice(a, func(i, j int) bool { - return a[i] < a[j] - }) - sort.Slice(b, func(i, j int) bool { - return b[i] < b[j] - }) - for i, v := range a { - if v != b[i] { - return false + var diffs []string + for id, e := range m { + if e.epochChanged { + diffs = append(diffs, fmt.Sprintf("n%d's epoch changed", id)) + } + if e.count > 0 { + diffs = append(diffs, fmt.Sprintf("n%d was decommissioned", id)) + } + if e.count < 0 { + diffs = append(diffs, fmt.Sprintf("n%d joined the cluster", id)) } } - return true + + sort.Strings(diffs) + return len(diffs) == 0, strings.Join(diffs, ", ") +} + +func (ns nodes) String() string { + var b strings.Builder + b.WriteString("n{") + if len(ns) > 0 { + b.WriteString(fmt.Sprintf("%d", ns[0].id)) + for _, node := range ns[1:] { + b.WriteString(fmt.Sprintf(",%d", node.id)) + } + } + b.WriteString("}") + + return b.String() +} + +// SafeFormat implements redact.SafeFormatter. +func (ns nodes) SafeFormat(s redact.SafePrinter, _ rune) { + s.SafeString(redact.SafeString(ns.String())) } // fenceVersionFor constructs the appropriate "fence version" for the given diff --git a/pkg/migration/util_test.go b/pkg/migration/util_test.go new file mode 100644 index 000000000000..336542b45eb5 --- /dev/null +++ b/pkg/migration/util_test.go @@ -0,0 +1,93 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import ( + "strconv" + "strings" + "testing" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" +) + +func TestNodesString(t *testing.T) { + defer leaktest.AfterTest(t) + + ns := func(ids ...int) nodes { + var nodes []node + for _, id := range ids { + nodes = append(nodes, node{id: roachpb.NodeID(id)}) + } + return nodes + } + + var tests = []struct { + ns nodes + exp string + }{ + {ns(), "n{}"}, + {ns(1), "n{1}"}, + {ns(1, 2, 3), "n{1,2,3}"}, + {ns(3, 4, 7), "n{3,4,7}"}, + } + + for _, test := range tests { + if got := test.ns.String(); got != test.exp { + t.Fatalf("expected %s, got %s", test.exp, got) + } + } +} + +func TestNodesIdentical(t *testing.T) { + defer leaktest.AfterTest(t) + + list := func(nodes ...string) nodes { // takes in strings of the form "id@epoch" + var ns []node + for _, n := range nodes { + parts := strings.Split(n, "@") + id, err := strconv.Atoi(parts[0]) + if err != nil { + t.Fatal(err) + } + epoch, err := strconv.Atoi(parts[1]) + if err != nil { + t.Fatal(err) + } + ns = append(ns, node{id: roachpb.NodeID(id), epoch: int64(epoch)}) + } + return ns + } + + var tests = []struct { + a, b nodes + expOk bool + expDiff string + }{ + {list(), list(), true, ""}, + {list("1@2"), list("1@2"), true, ""}, + {list("2@1", "1@2"), list("1@2", "2@1"), true, ""}, + {list("1@2"), list("1@3"), false, "n1's epoch changed"}, + {list("1@2"), list("1@2", "2@1"), false, "n2 joined the cluster"}, + {list("1@1", "2@1"), list("1@1"), false, "n2 was decommissioned"}, + {list("3@2", "4@6"), list("4@8", "5@2"), false, "n3 was decommissioned, n4's epoch changed, n5 joined the cluster"}, + } + + for _, test := range tests { + ok, diff := test.a.identical(test.b) + if ok != test.expOk { + t.Fatalf("expected identical = %t, got %t", test.expOk, ok) + } + if diff != test.expDiff { + t.Fatalf("expected diff %q, got %q", test.expDiff, diff) + } + } +} diff --git a/pkg/testutils/lint/lint_test.go b/pkg/testutils/lint/lint_test.go index 8432e765f221..93cb19d3b709 100644 --- a/pkg/testutils/lint/lint_test.go +++ b/pkg/testutils/lint/lint_test.go @@ -1560,6 +1560,7 @@ func TestLint(t *testing.T) { if err := stream.ForEach( stream.Sequence( filter, + stream.GrepNot("migration/.*exported func TestingNewCluster returns unexported type"), stream.GrepNot("sql/.*exported func .* returns unexported type sql.planNode"), stream.GrepNot("pkg/sql/types/types.go.* var Uuid should be UUID"), stream.GrepNot("pkg/sql/oidext/oidext.go.*don't use underscores in Go names; const T_"),