Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migration: add scaffolding for the migrations manager #56368

Merged
merged 3 commits into from
Nov 11, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions pkg/migration/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "migration",
srcs = [
"manager.go",
"migrations.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/migration",
visibility = ["//visibility:public"],
deps = [
"//pkg/kv",
"//pkg/kv/kvserver/liveness/livenesspb",
"//pkg/roachpb",
"//pkg/rpc/nodedialer",
"//pkg/sql",
"//vendor/github.com/cockroachdb/logtags",
],
)
136 changes: 136 additions & 0 deletions pkg/migration/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

// Package migration captures the facilities needed to define and execute
// migrations for a crdb cluster. These migrations can be arbitrarily long
// running, are free to send out arbitrary requests cluster wide, change
// internal DB state, and much more. They're typically reserved for crdb
// internal operations and state. Each migration is idempotent in nature, is
// associated with a specific cluster version, and executed when the cluster
// version is made activate on every node in the cluster.
//
// Examples of migrations that apply would be migrations to move all raft state
// from one storage engine to another, or purging all usage of the replicated
// truncated state in KV. A "sister" package of interest is pkg/sqlmigrations.
package migration

import (
"context"

"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/logtags"
)

// Migration defines a program to be executed once every node in the cluster is
// (a) running a specific binary version, and (b) has completed all prior
// migrations.
//
// Each migration is associated with a specific internal cluster version and is
// idempotent in nature. When setting the cluster version (via `SET CLUSTER
// SETTING version`), a manager process determines the set of migrations needed
// to bridge the gap between the current active cluster version, and the target
// one.
//
// To introduce a migration, introduce a version key in pkg/clusterversion, and
// introduce a corresponding internal cluster version for it. See [1] for
// details. Following that, define a Migration in this package and add it to the
// Registry. Be sure to key it in with the new cluster version we just added.
// During cluster upgrades, once the operator is able to set a cluster version
// setting that's past the version that was introduced (typically the major
// release version the migration was introduced in), the manager will execute
// the defined migration before letting the upgrade finalize.
//
// [1]: pkg/clusterversion/cockroach_versions.go
type Migration func(context.Context, *Helper) error

// Manager is the instance responsible for executing migrations across the
// cluster.
type Manager struct {
dialer *nodedialer.Dialer
nl nodeLiveness
executor *sql.InternalExecutor
db *kv.DB
}

// Helper captures all the primitives required to fully specify a migration.
type Helper struct {
*Manager
}

// nodeLiveness is the subset of the interface satisfied by CRDB's node liveness
// component that the migration manager relies upon.
type nodeLiveness interface {
GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error)
IsLive(roachpb.NodeID) (bool, error)
}

// NewManager constructs a new Manager.
//
// TODO(irfansharif): We'll need to eventually plumb in on a lease manager here.
func NewManager(
dialer *nodedialer.Dialer, nl nodeLiveness, executor *sql.InternalExecutor, db *kv.DB,
) *Manager {
return &Manager{
dialer: dialer,
executor: executor,
db: db,
nl: nl,
}
}

// MigrateTo runs the set of migrations required to upgrade the cluster version
// to the provided target version.
//
// TODO(irfansharif): Do something real here.
func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error {
// TODO(irfansharif): Should we inject every ctx here with specific labels
// for each migration, so they log distinctly?
_ = logtags.AddTag(ctx, "migration-mgr", nil)

// TODO(irfansharif): We'll need to acquire a lease here and refresh it
// throughout during the migration to ensure mutual exclusion.

// TODO(irfansharif): We'll need to create a system table to store
// in-progress state of long running migrations, for introspection.

// TODO(irfansharif): We'll want to either write to a KV key to record the
// version up until which we've already migrated to, or consult the system
// table mentioned above. Perhaps it makes sense to consult any given
// `StoreClusterVersionKey`, since the manager here will want to push out
// cluster version bumps for vX before attempting to migrate into vX+1.

// TODO(irfansharif): After determining the last completed migration, if
// any, we'll be want to assemble the list of remaining migrations to step
// through to get to targetV.
_ = targetV
var vs []roachpb.Version

for _, version := range vs {
_ = &Helper{Manager: m}
// TODO(irfansharif): We'll want to out the version gate to every node
// in the cluster. Each node will want to persist the version, bump the
// local version gates, and then return. The migration associated with
// the specific version can then assume that every node in the cluster
// has the corresponding version activated.

// TODO(irfansharif): We'll want to retrieve the right migration off of
// our registry of migrations, and execute it.
// TODO(irfansharif): We'll want to be able to override which migration
// is retrieved here within tests. We could make the registry be a part
// of the manager, and all tests to provide their own.
_ = Registry[version]
}

return nil
}
25 changes: 25 additions & 0 deletions pkg/migration/migrations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright 2020 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package migration

import "github.com/cockroachdb/cockroach/pkg/roachpb"

// Registry defines the global mapping between a version, and the associated
// migration. The migration is only executed after a cluster-wide bump of the
// version gate.
var Registry = make(map[roachpb.Version]Migration)

func init() {
// TODO(irfansharif): We'll want to register individual migrations with
// specific internal cluster versions here.
//
// Registry[clusterversion.VersionByKey(clusterversion.VersionWhatever)] = WhateverMigration
}
1 change: 1 addition & 0 deletions pkg/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ go_library(
"//pkg/kv/kvserver/protectedts/ptprovider",
"//pkg/kv/kvserver/protectedts/ptreconcile",
"//pkg/kv/kvserver/reports",
"//pkg/migration",
"//pkg/roachpb",
"//pkg/rpc",
"//pkg/rpc/nodedialer",
Expand Down
24 changes: 21 additions & 3 deletions pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts"
"github.com/cockroachdb/cockroach/pkg/migration"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/rpc/nodedialer"
Expand Down Expand Up @@ -111,7 +112,8 @@ type sqlServer struct {
type sqlServerOptionalKVArgs struct {
// nodesStatusServer gives access to the NodesStatus service.
nodesStatusServer serverpb.OptionalNodesStatusServer
// Narrowed down version of *NodeLiveness. Used by jobs and DistSQLPlanner
// Narrowed down version of *NodeLiveness. Used by jobs, DistSQLPlanner, and
// migration manager.
nodeLiveness optionalnodeliveness.Container
// Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor
// config, the DistSQL planner, the table statistics cache, the statements
Expand Down Expand Up @@ -429,8 +431,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
}

var isLive func(roachpb.NodeID) (bool, error)
if nl, ok := cfg.nodeLiveness.Optional(47900); ok {
isLive = nl.IsLive
nodeLiveness, ok := cfg.nodeLiveness.Optional(47900)
if ok {
isLive = nodeLiveness.IsLive
} else {
// We're on a SQL tenant, so this is the only node DistSQL will ever
// schedule on - always returning true is fine.
Expand Down Expand Up @@ -633,6 +636,21 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) {
)
execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry

if cfg.TenantID == roachpb.SystemTenantID {
// We only need to attach a version upgrade hook if we're the system
// tenant. Regular tenants are disallowed from changing cluster
// versions.
migrationMgr := migration.NewManager(
cfg.nodeDialer,
nodeLiveness,
cfg.circularInternalExecutor,
cfg.db,
)
execCfg.VersionUpgradeHook = func(ctx context.Context, targetV roachpb.Version) error {
return migrationMgr.MigrateTo(ctx, targetV)
}
}

temporaryObjectCleaner := sql.NewTemporaryObjectCleaner(
cfg.Settings,
cfg.db,
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/exec_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,11 @@ type ExecutorConfig struct {
HydratedTables *hydratedtables.Cache

GCJobNotifier *gcjobnotifier.Notifier

// VersionUpgradeHook is called after validating a `SET CLUSTER SETTING
// version` but before executing it. It can carry out arbitrary migrations
// that allow us to eventually remove legacy code.
VersionUpgradeHook func(ctx context.Context, to roachpb.Version) error
}

// Organization returns the value of cluster.organization.
Expand Down
26 changes: 24 additions & 2 deletions pkg/sql/set_cluster_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/clusterversion"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server/telemetry"
"github.com/cockroachdb/cockroach/pkg/settings"
Expand Down Expand Up @@ -47,6 +48,10 @@ type setClusterSettingNode struct {
setting settings.WritableSetting
// If value is nil, the setting should be reset.
value tree.TypedExpr
// versionUpgradeHook is called after validating a `SET CLUSTER SETTING
// version` but before executing it. It can carry out arbitrary migrations
// that allow us to eventually remove legacy code.
versionUpgradeHook func(ctx context.Context, to roachpb.Version) error
}

func checkPrivilegesForSetting(ctx context.Context, p *planner, name string, action string) error {
Expand Down Expand Up @@ -156,7 +161,11 @@ func (p *planner) SetClusterSetting(
}
}

return &setClusterSettingNode{name: name, st: st, setting: setting, value: value}, nil
csNode := setClusterSettingNode{
name: name, st: st, setting: setting, value: value,
versionUpgradeHook: p.execCfg.VersionUpgradeHook,
}
return &csNode, nil
}

func (n *setClusterSettingNode) startExec(params runParams) error {
Expand Down Expand Up @@ -209,7 +218,8 @@ func (n *setClusterSettingNode) startExec(params runParams) error {
}
reportedValue = tree.AsStringWithFlags(value, tree.FmtBareStrings)
var prev tree.Datum
if _, ok := n.setting.(*settings.VersionSetting); ok {
_, isSetVersion := n.setting.(*settings.VersionSetting)
if isSetVersion {
datums, err := execCfg.InternalExecutor.QueryRowEx(
ctx, "retrieve-prev-setting", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
Expand All @@ -232,6 +242,18 @@ func (n *setClusterSettingNode) startExec(params runParams) error {
if err != nil {
return err
}

if isSetVersion {
// toSettingString already validated the input, and checked to
// see that we are allowed to transition. Let's call into our
// upgrade hook to run migrations, if any.
versionStr := string(*value.(*tree.DString))
targetVersion := roachpb.MustParseVersion(versionStr)
if err := n.versionUpgradeHook(ctx, targetVersion); err != nil {
return err
}
}

if _, err = execCfg.InternalExecutor.ExecEx(
ctx, "update-setting", txn,
sessiondata.InternalExecutorOverride{User: security.RootUserName()},
Expand Down