Skip to content

Commit

Permalink
migration: plumb in a dialer, executor, kv.DB, and liveness
Browse files Browse the repository at this point in the history
The migration manager will need all of the above in order to execute
migrations. It'll need:
- A `Dialer`, to send out migration RPCs to individual nodes in the
  cluster.
- A handle on `Liveness`, to figure out which nodes are part of the
  cluster
- An `Executor`, for migrations to inspect/set internal SQL state, and
  to log progress into a dedicated system table
- A `kv.DB`, for migrations to inspect/set internal KV state, and to
  send out Migrate requests to ranges for execute below-Raft migrations

For more details, see the RFC (#48843). The fully "fleshed" out version
of this manager was originally prototyped in #56107. This PR is simply
pulling out the boring bits from there to move things along.

Release note: None
  • Loading branch information
irfansharif committed Nov 11, 2020
1 parent 10601db commit 53d8746
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 19 deletions.
4 changes: 4 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ go_library(
importpath = "github.com/cockroachdb/cockroach/pkg/migration",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc/nodedialer",
"//pkg/sql",
"//vendor/github.com/cockroachdb/logtags",
],
)
45 changes: 33 additions & 12 deletions pkg/migration/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,11 @@ package migration
import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/logtags"
)

Expand Down Expand Up @@ -52,21 +56,37 @@ type Migration func(context.Context, *Helper) error

// Manager is the instance responsible for executing migrations across the
// cluster.
type Manager struct{}
type Manager struct {
dialer *nodedialer.Dialer
nl nodeLiveness
executor *sql.InternalExecutor
db *kv.DB
}

// Helper captures all the primitives required to fully specify a migration.
type Helper struct {
// TODO(irfansharif): We'll want to hold on to the Manager here, to
// have access to all of its constituent components.
*Manager
}

// nodeLiveness is the subset of the interface satisfied by CRDB's node liveness
// component that the migration manager relies upon.
type nodeLiveness interface {
GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error)
IsLive(roachpb.NodeID) (bool, error)
}

// NewManager constructs a new Manager.
//
// TODO(irfansharif): We'll need to eventually plumb in a few things here. We'll
// need a handle on node liveness, a node dialer, a lease manager, an internal
// executor, and a kv.DB.
func NewManager() *Manager {
return &Manager{}
// TODO(irfansharif): We'll need to eventually plumb in on a lease manager here.
func NewManager(
dialer *nodedialer.Dialer, nl nodeLiveness, executor *sql.InternalExecutor, db *kv.DB,
) *Manager {
return &Manager{
dialer: dialer,
executor: executor,
db: db,
nl: nl,
}
}

// MigrateTo runs the set of migrations required to upgrade the cluster version
Expand All @@ -75,7 +95,7 @@ func NewManager() *Manager {
// TODO(irfansharif): Do something real here.
func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error {
// TODO(irfansharif): Should we inject every ctx here with specific labels
// for each migration, so they log distinctly? Do we need an AmbientContext?
// for each migration, so they log distinctly?
_ = logtags.AddTag(ctx, "migration-mgr", nil)

// TODO(irfansharif): We'll need to acquire a lease here and refresh it
Expand All @@ -97,7 +117,7 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error
var vs []roachpb.Version

for _, version := range vs {
_ = &Helper{}
_ = &Helper{Manager: m}
// TODO(irfansharif): We'll want to out the version gate to every node
// in the cluster. Each node will want to persist the version, bump the
// local version gates, and then return. The migration associated with
Expand All @@ -106,8 +126,9 @@ func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error

// TODO(irfansharif): We'll want to retrieve the right migration off of
// our registry of migrations, and execute it.
// TODO(irfansharif): We'll also want a testing override here to be able
// to stub out migrations as needed.
// TODO(irfansharif): We'll want to be able to override which migration
// is retrieved here within tests. We could make the registry be a part
// of the manager, and all tests to provide their own.
_ = Registry[version]
}

Expand Down
27 changes: 20 additions & 7 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,8 @@ type sqlServer struct {
type sqlServerOptionalKVArgs struct {
// nodesStatusServer gives access to the NodesStatus service.
nodesStatusServer serverpb.OptionalNodesStatusServer
// Narrowed down version of *NodeLiveness. Used by jobs and DistSQLPlanner
// Narrowed down version of *NodeLiveness. Used by jobs, DistSQLPlanner, and
// migration manager.
nodeLiveness optionalnodeliveness.Container
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
Expand Down Expand Up @@ -430,8 +431,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
}

var isLive func(roachpb.NodeID) (bool, error)
if nl, ok := cfg.nodeLiveness.Optional(47900); ok {
isLive = nl.IsLive
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
isLive = nodeLiveness.IsLive
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
// schedule on - always returning true is fine.
Expand All @@ -440,7 +442,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
}
}

migrationMgr := migration.NewManager()
*execCfg = sql.ExecutorConfig{
Settings: cfg.Settings,
NodeInfo: nodeInfo,
Expand All @@ -467,9 +468,6 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
RangeDescriptorCache: cfg.distSender.RangeDescriptorCache(),
RoleMemberCache: &sql.MembershipCache{},
TestingKnobs: sqlExecutorTestingKnobs,
VersionUpgradeHook: func(ctx context.Context, targetV roachpb.Version) error {
return migrationMgr.MigrateTo(ctx, targetV)
},

DistSQLPlanner: sql.NewDistSQLPlanner(
ctx,
Expand Down Expand Up @@ -638,6 +636,21 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
)
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

if cfg.TenantID == roachpb.SystemTenantID {
// We only need to attach a version upgrade hook if we're the system
// tenant. Regular tenants are disallowed from changing cluster
// versions.
migrationMgr := migration.NewManager(
cfg.nodeDialer,
nodeLiveness,
cfg.circularInternalExecutor,
cfg.db,
)
execCfg.VersionUpgradeHook = func(ctx context.Context, targetV roachpb.Version) error {
return migrationMgr.MigrateTo(ctx, targetV)
}
}

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down

0 comments on commit 53d8746

Please sign in to comment.