diff --git a/pkg/migration/BUILD.bazel b/pkg/migration/BUILD.bazel new file mode 100644 index 000000000000..3474ea2907e2 --- /dev/null +++ b/pkg/migration/BUILD.bazel @@ -0,0 +1,19 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "migration", + srcs = [ + "manager.go", + "migrations.go", + ], + importpath = "github.com/cockroachdb/cockroach/pkg/migration", + visibility = ["//visibility:public"], + deps = [ + "//pkg/kv", + "//pkg/kv/kvserver/liveness/livenesspb", + "//pkg/roachpb", + "//pkg/rpc/nodedialer", + "//pkg/sql", + "//vendor/github.com/cockroachdb/logtags", + ], +) diff --git a/pkg/migration/manager.go b/pkg/migration/manager.go new file mode 100644 index 000000000000..d5115398eec5 --- /dev/null +++ b/pkg/migration/manager.go @@ -0,0 +1,136 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +// Package migration captures the facilities needed to define and execute +// migrations for a crdb cluster. These migrations can be arbitrarily long +// running, are free to send out arbitrary requests cluster wide, change +// internal DB state, and much more. They're typically reserved for crdb +// internal operations and state. Each migration is idempotent in nature, is +// associated with a specific cluster version, and executed when the cluster +// version is made activate on every node in the cluster. +// +// Examples of migrations that apply would be migrations to move all raft state +// from one storage engine to another, or purging all usage of the replicated +// truncated state in KV. A "sister" package of interest is pkg/sqlmigrations. +package migration + +import ( + "context" + + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" + "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/logtags" +) + +// Migration defines a program to be executed once every node in the cluster is +// (a) running a specific binary version, and (b) has completed all prior +// migrations. +// +// Each migration is associated with a specific internal cluster version and is +// idempotent in nature. When setting the cluster version (via `SET CLUSTER +// SETTING version`), a manager process determines the set of migrations needed +// to bridge the gap between the current active cluster version, and the target +// one. +// +// To introduce a migration, introduce a version key in pkg/clusterversion, and +// introduce a corresponding internal cluster version for it. See [1] for +// details. Following that, define a Migration in this package and add it to the +// Registry. Be sure to key it in with the new cluster version we just added. +// During cluster upgrades, once the operator is able to set a cluster version +// setting that's past the version that was introduced (typically the major +// release version the migration was introduced in), the manager will execute +// the defined migration before letting the upgrade finalize. +// +// [1]: pkg/clusterversion/cockroach_versions.go +type Migration func(context.Context, *Helper) error + +// Manager is the instance responsible for executing migrations across the +// cluster. +type Manager struct { + dialer *nodedialer.Dialer + nl nodeLiveness + executor *sql.InternalExecutor + db *kv.DB +} + +// Helper captures all the primitives required to fully specify a migration. +type Helper struct { + *Manager +} + +// nodeLiveness is the subset of the interface satisfied by CRDB's node liveness +// component that the migration manager relies upon. +type nodeLiveness interface { + GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) + IsLive(roachpb.NodeID) (bool, error) +} + +// NewManager constructs a new Manager. +// +// TODO(irfansharif): We'll need to eventually plumb in on a lease manager here. +func NewManager( + dialer *nodedialer.Dialer, nl nodeLiveness, executor *sql.InternalExecutor, db *kv.DB, +) *Manager { + return &Manager{ + dialer: dialer, + executor: executor, + db: db, + nl: nl, + } +} + +// MigrateTo runs the set of migrations required to upgrade the cluster version +// to the provided target version. +// +// TODO(irfansharif): Do something real here. +func (m *Manager) MigrateTo(ctx context.Context, targetV roachpb.Version) error { + // TODO(irfansharif): Should we inject every ctx here with specific labels + // for each migration, so they log distinctly? + _ = logtags.AddTag(ctx, "migration-mgr", nil) + + // TODO(irfansharif): We'll need to acquire a lease here and refresh it + // throughout during the migration to ensure mutual exclusion. + + // TODO(irfansharif): We'll need to create a system table to store + // in-progress state of long running migrations, for introspection. + + // TODO(irfansharif): We'll want to either write to a KV key to record the + // version up until which we've already migrated to, or consult the system + // table mentioned above. Perhaps it makes sense to consult any given + // `StoreClusterVersionKey`, since the manager here will want to push out + // cluster version bumps for vX before attempting to migrate into vX+1. + + // TODO(irfansharif): After determining the last completed migration, if + // any, we'll be want to assemble the list of remaining migrations to step + // through to get to targetV. + _ = targetV + var vs []roachpb.Version + + for _, version := range vs { + _ = &Helper{Manager: m} + // TODO(irfansharif): We'll want to out the version gate to every node + // in the cluster. Each node will want to persist the version, bump the + // local version gates, and then return. The migration associated with + // the specific version can then assume that every node in the cluster + // has the corresponding version activated. + + // TODO(irfansharif): We'll want to retrieve the right migration off of + // our registry of migrations, and execute it. + // TODO(irfansharif): We'll want to be able to override which migration + // is retrieved here within tests. We could make the registry be a part + // of the manager, and all tests to provide their own. + _ = Registry[version] + } + + return nil +} diff --git a/pkg/migration/migrations.go b/pkg/migration/migrations.go new file mode 100644 index 000000000000..cf3941ea0d53 --- /dev/null +++ b/pkg/migration/migrations.go @@ -0,0 +1,25 @@ +// Copyright 2020 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package migration + +import "github.com/cockroachdb/cockroach/pkg/roachpb" + +// Registry defines the global mapping between a version, and the associated +// migration. The migration is only executed after a cluster-wide bump of the +// version gate. +var Registry = make(map[roachpb.Version]Migration) + +func init() { + // TODO(irfansharif): We'll want to register individual migrations with + // specific internal cluster versions here. + // + // Registry[clusterversion.VersionByKey(clusterversion.VersionWhatever)] = WhateverMigration +} diff --git a/pkg/server/BUILD.bazel b/pkg/server/BUILD.bazel index 78d647ec362e..26f4d7feacad 100644 --- a/pkg/server/BUILD.bazel +++ b/pkg/server/BUILD.bazel @@ -68,6 +68,7 @@ go_library( "//pkg/kv/kvserver/protectedts/ptprovider", "//pkg/kv/kvserver/protectedts/ptreconcile", "//pkg/kv/kvserver/reports", + "//pkg/migration", "//pkg/roachpb", "//pkg/rpc", "//pkg/rpc/nodedialer", diff --git a/pkg/server/server_sql.go b/pkg/server/server_sql.go index b8fffadb1a84..b746a9885146 100644 --- a/pkg/server/server_sql.go +++ b/pkg/server/server_sql.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/kv/kvclient/kvtenant" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/protectedts" + "github.com/cockroachdb/cockroach/pkg/migration" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/rpc" "github.com/cockroachdb/cockroach/pkg/rpc/nodedialer" @@ -111,7 +112,8 @@ type sqlServer struct { type sqlServerOptionalKVArgs struct { // nodesStatusServer gives access to the NodesStatus service. nodesStatusServer serverpb.OptionalNodesStatusServer - // Narrowed down version of *NodeLiveness. Used by jobs and DistSQLPlanner + // Narrowed down version of *NodeLiveness. Used by jobs, DistSQLPlanner, and + // migration manager. nodeLiveness optionalnodeliveness.Container // Gossip is relied upon by distSQLCfg (execinfra.ServerConfig), the executor // config, the DistSQL planner, the table statistics cache, the statements @@ -429,8 +431,9 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { } var isLive func(roachpb.NodeID) (bool, error) - if nl, ok := cfg.nodeLiveness.Optional(47900); ok { - isLive = nl.IsLive + nodeLiveness, ok := cfg.nodeLiveness.Optional(47900) + if ok { + isLive = nodeLiveness.IsLive } else { // We're on a SQL tenant, so this is the only node DistSQL will ever // schedule on - always returning true is fine. @@ -633,6 +636,21 @@ func newSQLServer(ctx context.Context, cfg sqlServerArgs) (*sqlServer, error) { ) execCfg.StmtDiagnosticsRecorder = stmtDiagnosticsRegistry + if cfg.TenantID == roachpb.SystemTenantID { + // We only need to attach a version upgrade hook if we're the system + // tenant. Regular tenants are disallowed from changing cluster + // versions. + migrationMgr := migration.NewManager( + cfg.nodeDialer, + nodeLiveness, + cfg.circularInternalExecutor, + cfg.db, + ) + execCfg.VersionUpgradeHook = func(ctx context.Context, targetV roachpb.Version) error { + return migrationMgr.MigrateTo(ctx, targetV) + } + } + temporaryObjectCleaner := sql.NewTemporaryObjectCleaner( cfg.Settings, cfg.db, diff --git a/pkg/sql/exec_util.go b/pkg/sql/exec_util.go index 237afba419a6..219ccb4357bb 100644 --- a/pkg/sql/exec_util.go +++ b/pkg/sql/exec_util.go @@ -736,6 +736,11 @@ type ExecutorConfig struct { HydratedTables *hydratedtables.Cache GCJobNotifier *gcjobnotifier.Notifier + + // VersionUpgradeHook is called after validating a `SET CLUSTER SETTING + // version` but before executing it. It can carry out arbitrary migrations + // that allow us to eventually remove legacy code. + VersionUpgradeHook func(ctx context.Context, to roachpb.Version) error } // Organization returns the value of cluster.organization. diff --git a/pkg/sql/set_cluster_setting.go b/pkg/sql/set_cluster_setting.go index b0f068d73599..d2029d28a248 100644 --- a/pkg/sql/set_cluster_setting.go +++ b/pkg/sql/set_cluster_setting.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/clusterversion" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security" "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings" @@ -47,6 +48,10 @@ type setClusterSettingNode struct { setting settings.WritableSetting // If value is nil, the setting should be reset. value tree.TypedExpr + // versionUpgradeHook is called after validating a `SET CLUSTER SETTING + // version` but before executing it. It can carry out arbitrary migrations + // that allow us to eventually remove legacy code. + versionUpgradeHook func(ctx context.Context, to roachpb.Version) error } func checkPrivilegesForSetting(ctx context.Context, p *planner, name string, action string) error { @@ -156,7 +161,11 @@ func (p *planner) SetClusterSetting( } } - return &setClusterSettingNode{name: name, st: st, setting: setting, value: value}, nil + csNode := setClusterSettingNode{ + name: name, st: st, setting: setting, value: value, + versionUpgradeHook: p.execCfg.VersionUpgradeHook, + } + return &csNode, nil } func (n *setClusterSettingNode) startExec(params runParams) error { @@ -209,7 +218,8 @@ func (n *setClusterSettingNode) startExec(params runParams) error { } reportedValue = tree.AsStringWithFlags(value, tree.FmtBareStrings) var prev tree.Datum - if _, ok := n.setting.(*settings.VersionSetting); ok { + _, isSetVersion := n.setting.(*settings.VersionSetting) + if isSetVersion { datums, err := execCfg.InternalExecutor.QueryRowEx( ctx, "retrieve-prev-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()}, @@ -232,6 +242,18 @@ func (n *setClusterSettingNode) startExec(params runParams) error { if err != nil { return err } + + if isSetVersion { + // toSettingString already validated the input, and checked to + // see that we are allowed to transition. Let's call into our + // upgrade hook to run migrations, if any. + versionStr := string(*value.(*tree.DString)) + targetVersion := roachpb.MustParseVersion(versionStr) + if err := n.versionUpgradeHook(ctx, targetVersion); err != nil { + return err + } + } + if _, err = execCfg.InternalExecutor.ExecEx( ctx, "update-setting", txn, sessiondata.InternalExecutorOverride{User: security.RootUserName()},