Skip to content

Commit

Permalink
sql: repartition regional by row tables on region add
Browse files Browse the repository at this point in the history
This patch adds functionality to repartition REGIONAL BY ROW tables when
a new region is added to the database. An index can only be partitioned
using an enum value if it is PUBLIC. Thus we only modify partition
descriptors once all transitioning enum members have done so
successfully. This happens in the same transaction that promoted the
enum members, ensuring sane rollbacks in the face of non-retryable
partitioning failure. Once partitioning is complete, zone
configurations are applied to partitions as well.

This patch changes how uncommitted type descriptors are stored to enable
partitioning in the same transaction that previously promoted enum
members. A column's types.T is hydrated using enum metadata fields that
are cached on the type descriptor. Previously, this cache was
constructed based on the cluster version of the type descriptor and not
updated when the type descriptor was modified. After this patch,
whenever a type descriptor is added back as an uncomitted descriptor to
the desc collection, we reconstruct the cached fields stored on it.
For us, this means that tables being partitioned are hydrated with the
correct visibility for enum members, which allows us to partition on
values promoted in the same transaction.

Release note (sql change): ALTER DATABASE ... ADD REGION now
repartitions REGIONAL BY ROW tables and updates the zone configs on
the newly created partitions as well.
  • Loading branch information
arulajmani committed Feb 22, 2021
1 parent 3825fd1 commit 99ee0be
Show file tree
Hide file tree
Showing 13 changed files with 886 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2110,3 +2110,4 @@ TABLE rbt_table_gc_ttl ALTER TABLE rbt_table_gc_ttl CONFIGURE ZONE USING
constraints = '{+region=ap-southeast-2: 1, +region=ca-central-1: 1, +region=us-east-1: 1}',
voter_constraints = '[+region=ca-central-1]',
lease_preferences = '[[+region=ca-central-1]]'

506 changes: 506 additions & 0 deletions pkg/ccl/logictestccl/testdata/logic_test/regional_by_row

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/ccl/multiregionccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ go_test(
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_stretchr_testify//require",
],
Expand Down
95 changes: 95 additions & 0 deletions pkg/ccl/multiregionccl/regional_by_row_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,50 @@ import (
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/errors"
"github.com/stretchr/testify/require"
)

// REGIONAL BY ROW tests are defined in multiregionccl as REGIONAL BY ROW
// requires CCL to operate.

// createTestMultiRegionCluster creates a test cluster with numServers number of
// nodes with the provided testing knobs applied to each of the nodes. Every
// node is placed in its own locality, named "us-east1", "us-east2", and so on.
func createTestMultiRegionCluster(
t *testing.T, numServers int, knobs base.TestingKnobs,
) (serverutils.TestClusterInterface, *gosql.DB, func()) {
serverArgs := make(map[int]base.TestServerArgs)
regionNames := make([]string, numServers)
for i := 0; i < numServers; i++ {
// "us-east1", "us-east2"...
regionNames[i] = fmt.Sprintf("us-east%d", i+1)
}

for i := 0; i < numServers; i++ {
serverArgs[i] = base.TestServerArgs{
Knobs: knobs,
Locality: roachpb.Locality{
Tiers: []roachpb.Tier{{Key: "region", Value: regionNames[i]}},
},
}
}

tc := serverutils.StartNewTestCluster(t, numServers, base.TestClusterArgs{
ServerArgsPerNode: serverArgs,
})

ctx := context.Background()
cleanup := func() {
tc.Stopper().Stop(ctx)
}

sqlDB := tc.ServerConn(0)

return tc, sqlDB, cleanup
}

// TestAlterTableLocalityRegionalByRowError tests an alteration involving
// REGIONAL BY ROW which gets its async job interrupted by some sort of
// error or cancellation. After this, we expect the table to retain
Expand Down Expand Up @@ -350,3 +387,61 @@ USE t;
})
}
}

// TestRepartitionFailureRollback adds and removes a region from a multi-region
// database, but injects a non-retryable error before regional by row tables
// can be repartitioned. The expectation is that we should roll back changes to
// the multi-region enum, reverting to the state before the region add/remove
// transaction was executed.
func TestRepartitionFailureRollback(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

// Decrease the adopt loop interval so that retries happen quickly.
defer sqltestutils.SetTestJobsAdoptInterval()()

numServers := 3

var mu syncutil.Mutex
errorReturned := false
knobs := base.TestingKnobs{
SQLTypeSchemaChanger: &sql.TypeSchemaChangerTestingKnobs{
RunBeforeMultiRegionUpdates: func() error {
mu.Lock()
defer mu.Unlock()
if !errorReturned {
errorReturned = true
return errors.New("boom")
}
return nil
},
},
}
_, sqlDB, cleanup := createTestMultiRegionCluster(t, numServers, knobs)
defer cleanup()

_, err := sqlDB.Exec(
`CREATE DATABASE db WITH PRIMARY REGION "us-east1" REGIONS "us-east2";
CREATE TABLE db.t(k INT PRIMARY KEY) LOCALITY REGIONAL BY ROW`)
require.NoError(t, err)
if err != nil {
t.Error(err)
}

_, err = sqlDB.Exec(`BEGIN;
ALTER DATABASE db ADD REGION "us-east3";
ALTER DATABASE db DROP REGION "us-east2";
COMMIT;`)
require.Error(t, err, "boom")

// The cleanup job should kick in and revert the changes that happened to the
// type descriptor in the user txn. We should eventually be able to add
// "us-east3" and remove "us-east2".
testutils.SucceedsSoon(t, func() error {
_, err = sqlDB.Exec(`BEGIN;
ALTER DATABASE db ADD REGION "us-east3";
ALTER DATABASE db DROP REGION "us-east2";
COMMIT;`)
return err
})
}
9 changes: 8 additions & 1 deletion pkg/server/server_sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -873,7 +873,14 @@ func (s *SQLServer) preStart(
PlanHookMaker: func(opName string, txn *kv.Txn, user security.SQLUsername) (interface{}, func()) {
// This is a hack to get around a Go package dependency cycle. See comment
// in sql/jobs/registry.go on planHookMaker.
return sql.NewInternalPlanner(opName, txn, user, &sql.MemoryMetrics{}, s.execCfg, sessiondatapb.SessionData{})
return sql.NewInternalPlanner(
opName,
txn,
user,
&sql.MemoryMetrics{},
s.execCfg,
sessiondatapb.SessionData{},
)
},
},
scheduledjobs.ProdJobSchedulerEnv,
Expand Down
22 changes: 21 additions & 1 deletion pkg/sql/catalog/descs/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -1313,6 +1313,21 @@ func (tc *Collection) AddUncommittedDescriptor(desc catalog.MutableDescriptor) e
return err
}

// maybeRefreshCachedFieldsOnTypeDescriptor refreshes the cached fields on a
// Mutable if the given descriptor is a type descriptor and works as a pass
// through for all other descriptors. Mutable type descriptors are refreshed to
// reconstruct enumMetadata. This ensures that tables hydration following a
// type descriptor update (in the same txn) happens using the modified fields.
func maybeRefreshCachedFieldsOnTypeDescriptor(
desc catalog.MutableDescriptor,
) (catalog.MutableDescriptor, error) {
typeDesc, ok := desc.(catalog.TypeDescriptor)
if ok {
return typedesc.UpdateCachedFieldsOnModifiedMutable(typeDesc)
}
return desc, nil
}

func (tc *Collection) addUncommittedDescriptor(
desc catalog.MutableDescriptor,
) (*uncommittedDescriptor, error) {
Expand All @@ -1324,8 +1339,13 @@ func (tc *Collection) addUncommittedDescriptor(
desc.GetID(), version, origVersion)
}

mutable, err := maybeRefreshCachedFieldsOnTypeDescriptor(desc)
if err != nil {
return nil, err
}

ud := &uncommittedDescriptor{
mutable: desc,
mutable: mutable,
immutable: desc.ImmutableCopy(),
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -4048,7 +4048,7 @@ func (desc *wrapper) GetRegionalByTableRegion() (descpb.RegionName, error) {
// REGIONAL BY ROW table.
func (desc *wrapper) GetRegionalByRowTableRegionColumnName() (tree.Name, error) {
if !desc.IsLocalityRegionalByRow() {
return "", errors.AssertionFailedf("%s is not REGIONAL BY ROW", desc.Name)
return "", errors.AssertionFailedf("%q is not a REGIONAL BY ROW table", desc.Name)
}
colName := desc.LocalityConfig.GetRegionalByRow().As
if colName == nil {
Expand Down
17 changes: 17 additions & 0 deletions pkg/sql/catalog/typedesc/type_desc.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,23 @@ func NewExistingMutable(desc descpb.TypeDescriptor) *Mutable {
}
}

// UpdateCachedFieldsOnModifiedMutable refreshes the Immutable field by
// reconstructing it. This means that the fields used to fill enumMetadata
// (readOnly, logicalReps, physicalReps) are reconstructed to reflect the
// modified Mutable's state. This allows us to hydrate tables correctly even
// when preceded by a type descriptor modification in the same transaction.
func UpdateCachedFieldsOnModifiedMutable(desc catalog.TypeDescriptor) (*Mutable, error) {
imm := makeImmutable(*protoutil.Clone(desc.TypeDesc()).(*descpb.TypeDescriptor))
imm.isUncommittedVersion = desc.IsUncommittedVersion()

mutable, ok := desc.(*Mutable)
if !ok {
return nil, errors.AssertionFailedf("type descriptor was not mutable")
}
mutable.Immutable = imm
return mutable, nil
}

// NewImmutable returns an Immutable from the given TypeDescriptor.
func NewImmutable(desc descpb.TypeDescriptor) *Immutable {
m := makeImmutable(desc)
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/job_exec_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,15 @@ type plannerJobExecContext struct {
func MakeJobExecContext(
opName string, user security.SQLUsername, memMetrics *MemoryMetrics, execCfg *ExecutorConfig,
) (JobExecContext, func()) {
p, close := newInternalPlanner(opName, nil /*txn*/, user, memMetrics, execCfg, sessiondatapb.SessionData{})
plannerInterface, close := NewInternalPlanner(
opName,
nil, /*txn*/
user,
memMetrics,
execCfg,
sessiondatapb.SessionData{},
)
p := plannerInterface.(*planner)
return &plannerJobExecContext{p: p}, close
}

Expand Down
45 changes: 36 additions & 9 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,24 @@ func (evalCtx *extendedEvalContext) setSessionID(sessionID ClusterWideID) {
// growth in the log.
var noteworthyInternalMemoryUsageBytes = envutil.EnvOrDefaultInt64("COCKROACH_NOTEWORTHY_INTERNAL_MEMORY_USAGE", 1<<20 /* 1 MB */)

// internalPlannerParams encapsulates configurable planner fields. The defaults
// are set in newInternalPlanner.
type internalPlannerParams struct {
collection *descs.Collection
}

// InternalPlannerParamsOption is an option that can be passed to
// NewInternalPlanner.
type InternalPlannerParamsOption func(*internalPlannerParams)

// WithDescCollection configures the planner with the provided collection
// instead of the default (creating a new one from scratch).
func WithDescCollection(collection *descs.Collection) InternalPlannerParamsOption {
return func(params *internalPlannerParams) {
params.collection = collection
}
}

// NewInternalPlanner is an exported version of newInternalPlanner. It
// returns an interface{} so it can be used outside of the sql package.
func NewInternalPlanner(
Expand All @@ -243,8 +261,9 @@ func NewInternalPlanner(
memMetrics *MemoryMetrics,
execCfg *ExecutorConfig,
sessionData sessiondatapb.SessionData,
opts ...InternalPlannerParamsOption,
) (interface{}, func()) {
return newInternalPlanner(opName, txn, user, memMetrics, execCfg, sessionData)
return newInternalPlanner(opName, txn, user, memMetrics, execCfg, sessionData, opts...)
}

// newInternalPlanner creates a new planner instance for internal usage. This
Expand All @@ -262,7 +281,21 @@ func newInternalPlanner(
memMetrics *MemoryMetrics,
execCfg *ExecutorConfig,
sessionData sessiondatapb.SessionData,
opts ...InternalPlannerParamsOption,
) (*planner, func()) {
// Default parameters which may be override by the supplied options.
params := &internalPlannerParams{
// The table collection used by the internal planner does not rely on the
// deprecatedDatabaseCache and there are no subscribers to the
// deprecatedDatabaseCache, so we can leave it uninitialized.
// Furthermore, we're not concerned about the efficiency of querying tables
// with user-defined types, hence the nil hydratedTables.
collection: descs.NewCollection(execCfg.Settings, execCfg.LeaseManager, nil /* hydratedTables */),
}
for _, opt := range opts {
opt(params)
}

// We need a context that outlives all the uses of the planner (since the
// planner captures it in the EvalCtx, and so does the cleanup function that
// we're going to return. We just create one here instead of asking the caller
Expand All @@ -280,12 +313,6 @@ func newInternalPlanner(
}
sd.SessionData.Database = "system"
sd.SessionData.UserProto = user.EncodeProto()
// The table collection used by the internal planner does not rely on the
// deprecatedDatabaseCache and there are no subscribers to the
// deprecatedDatabaseCache, so we can leave it uninitialized.
// Furthermore, we're not concerned about the efficiency of querying tables
// with user-defined types, hence the nil hydratedTables.
tables := descs.NewCollection(execCfg.Settings, execCfg.LeaseManager, nil /* hydratedTables */)
dataMutator := &sessionDataMutator{
data: sd,
defaults: SessionDefaults(map[string]string{
Expand Down Expand Up @@ -324,7 +351,7 @@ func newInternalPlanner(
noteworthyInternalMemoryUsageBytes, execCfg.Settings)

p.extendedEvalCtx = internalExtendedEvalCtx(
ctx, sd, dataMutator, tables, txn, ts, ts, execCfg, plannerMon,
ctx, sd, dataMutator, params.collection, txn, ts, ts, execCfg, plannerMon,
)
p.extendedEvalCtx.Planner = p
p.extendedEvalCtx.PrivilegedAccessor = p
Expand All @@ -344,7 +371,7 @@ func newInternalPlanner(
p.extendedEvalCtx.ExecCfg = execCfg
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Annotations = &p.semaCtx.Annotations
p.extendedEvalCtx.Descs = tables
p.extendedEvalCtx.Descs = params.collection

p.queryCacheSession.Init()
p.optPlanningCtx.init(p)
Expand Down
10 changes: 9 additions & 1 deletion pkg/sql/schema_changer.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,15 @@ func (sc *SchemaChanger) backfillQueryIntoTable(

// Create an internal planner as the planner used to serve the user query
// would have committed by this point.
p, cleanup := NewInternalPlanner(desc, txn, security.RootUserName(), &MemoryMetrics{}, sc.execCfg, sessiondatapb.SessionData{})
p, cleanup := NewInternalPlanner(
desc,
txn,
security.RootUserName(),
&MemoryMetrics{},
sc.execCfg,
sessiondatapb.SessionData{},
)

defer cleanup()
localPlanner := p.(*planner)
stmt, err := parser.ParseOne(query)
Expand Down
Loading

0 comments on commit 99ee0be

Please sign in to comment.