Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bulk: use expiration time to disable automatic merging in import, backfill, restore #38079

Merged
merged 2 commits into from
Jun 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"runtime"
"sort"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
Expand Down Expand Up @@ -752,6 +753,7 @@ rangeLoop:
// there's some way to test it without running an O(hour) long benchmark.
func splitAndScatter(
restoreCtx context.Context,
settings *cluster.Settings,
db *client.DB,
kr *storageccl.KeyRewriter,
numClusterNodes int,
Expand Down Expand Up @@ -780,6 +782,16 @@ func splitAndScatter(
}

importSpanChunksCh := make(chan []importEntry)
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
for idx, importSpanChunk := range importSpanChunks {
Expand All @@ -793,7 +805,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of the first
// entry in the _next_ chunk.
log.VEventf(restoreCtx, 1, "presplitting chunk %d of %d", idx, len(importSpanChunks))
if err := db.AdminSplit(ctx, chunkKey, chunkKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, chunkKey, chunkKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -849,7 +861,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of
// the _next_ entry.
log.VEventf(restoreCtx, 1, "presplitting %d of %d", idx, len(importSpans))
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -1019,6 +1031,7 @@ func restore(
restoreCtx context.Context,
db *client.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
backupDescs []BackupDescriptor,
endTime hlc.Timestamp,
sqlDescs []sqlbase.Descriptor,
Expand Down Expand Up @@ -1068,9 +1081,13 @@ func restore(
return mu.res, nil, nil, err
}

{
// Disable merging for the table IDs being restored into. We don't want the
// merge queue undoing the splits performed during RESTORE.
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we don't have to worry about the
// merge queue automatically merging the splits performed during RESTORE.
// Otherwise, we have to rely on the gossip mechanism to disable the merge
// queue for the table IDs being restored into.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
Expand Down Expand Up @@ -1159,7 +1176,7 @@ func restore(
readyForImportCh := make(chan importEntry, presplitLeadLimit)
g.GoCtx(func(ctx context.Context) error {
defer close(readyForImportCh)
return splitAndScatter(ctx, db, kr, numClusterNodes, importSpans, readyForImportCh)
return splitAndScatter(ctx, settings, db, kr, numClusterNodes, importSpans, readyForImportCh)
})

requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
Expand Down Expand Up @@ -1453,6 +1470,7 @@ func (r *restoreResumer) Resume(
ctx,
p.ExecCfg().DB,
p.ExecCfg().Gossip,
p.ExecCfg().Settings,
backupDescs,
details.EndTime,
sqlDescs,
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -769,9 +769,13 @@ func (r *importResumer) Resume(
}
}

{
// Disable merging for the table IDs being imported into. We don't want the
// merge queue undoing the splits performed during IMPORT.
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we don't have to worry about the
// merge queue automatically merging the splits performed during IMPORT.
// Otherwise, we have to rely on the gossip mechanism to disable the merge
// queue for the table IDs being imported into.
stickyBitEnabled := r.settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io/ioutil"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -484,10 +485,21 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
return conv.readFiles(ctx, cp.spec.Uri, cp.spec.Format, progFn, cp.flowCtx.Settings)
})

// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cp.flowCtx.ClientDB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

if cp.spec.IngestDirectly {
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.AllIndexSpans() {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -147,6 +148,17 @@ func (sp *sstWriter) Run(ctx context.Context) {
return err
}

// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit
// to ensure that the splits are not automatically split by the merge
// queue. If the cluster does not support sticky bits, we disable the merge
// queue via gossip, so we can just set the split to expire immediately.
stickyBitEnabled := sp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = sp.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

// Fetch all the keys in each span and write them to storage.
iter := store.NewIterator()
defer iter.Close()
Expand Down Expand Up @@ -177,7 +189,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
end = sst.span.EndKey
}

if err := sp.db.AdminSplit(ctx, end, end, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sp.db.AdminSplit(ctx, end, end, expirationTime); err != nil {
return err
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1071,12 +1072,23 @@ func (sc *SchemaChanger) backfillIndexes(
fn()
}

disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
// TODO(jeffreyxiao): Remove check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := sc.execCfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if !stickyBitEnabled {
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
} else {
expirationTime = sc.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

for _, span := range addingSpans {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}
}
Expand Down
128 changes: 80 additions & 48 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,12 @@ func (r *Replica) AdminSplit(
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
}

var lastErr error
retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop. We
// need to explicitly check this condition. Having a valid lease, as we
// verify below, does not imply that the range still exists: even after a
// range has been merged into its left-hand neighbor, its final lease (i.e.,
// the lease we have in r.mu.state.Lease) can remain valid indefinitely.
if _, err := r.IsDestroyed(); err != nil {
return reply, roachpb.NewError(err)
}

// Admin commands always require the range lease to begin (see
// executeAdminBatch), but we may have lost it while in this retry loop.
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this,
// we make sure that we still have the lease before each attempt.
if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil {
return roachpb.AdminSplitResponse{}, pErr
}

reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc(), true /* delayable */, reason)
// On seeing a ConditionFailedError or an AmbiguousResultError, retry
// the command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
switch err.(type) {
case *roachpb.ConditionFailedError:
return true
case *roachpb.AmbiguousResultError:
return true
default:
return false
}
}); !retry {
return reply, roachpb.NewError(lastErr)
}
}
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.AdminSplitResponse{}, roachpb.NewError(lastErr)
err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
var err error
reply, err = r.adminSplitWithDescriptor(ctx, args, desc, true /* delayable */, reason)
return err
})
return reply, err
}

func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (string, bool) {
Expand Down Expand Up @@ -380,13 +346,24 @@ func (r *Replica) adminSplitWithDescriptor(
func (r *Replica) AdminUnsplit(
ctx context.Context, args roachpb.AdminUnsplitRequest, reason string,
) (roachpb.AdminUnsplitResponse, *roachpb.Error) {
// TODO(jeffreyxiao): Have a retry loop for ConditionalFailed errors similar
// to AdminSplit
var reply roachpb.AdminUnsplitResponse
err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
var err error
reply, err = r.adminUnsplitWithDescriptor(ctx, args, desc, reason)
return err
})
return reply, err
}

desc := *r.Desc()
func (r *Replica) adminUnsplitWithDescriptor(
ctx context.Context,
args roachpb.AdminUnsplitRequest,
desc *roachpb.RangeDescriptor,
reason string,
) (roachpb.AdminUnsplitResponse, error) {
var reply roachpb.AdminUnsplitResponse
if !bytes.Equal(desc.StartKey.AsRawKey(), args.Header().Key) {
return reply, roachpb.NewErrorf("key %s is not the start of a range", args.Header().Key)
return reply, errors.Errorf("key %s is not the start of a range", args.Header().Key)
}

// If the range's sticky bit is already hlc.Timestamp{}, we treat the
Expand All @@ -398,11 +375,11 @@ func (r *Replica) AdminUnsplit(

if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
newDesc := desc
newDesc := *desc
newDesc.StickyBit = hlc.Timestamp{}
descKey := keys.RangeDescriptorKey(newDesc.StartKey)

if err := updateRangeDescriptor(b, descKey, &desc, &newDesc); err != nil {
if err := updateRangeDescriptor(b, descKey, desc, &newDesc); err != nil {
return err
}
if err := updateRangeAddressing(b, &newDesc); err != nil {
Expand All @@ -421,12 +398,67 @@ func (r *Replica) AdminUnsplit(
})
return txn.Run(ctx, b)
}); err != nil {
return reply, roachpb.NewErrorf("unsplit at key %s failed: %s", args.Header().Key, err)
// The ConditionFailedError can occur because the descriptors acting as
// expected values in the CPuts used to update the range descriptor are
// picked outside the transaction. Return ConditionFailedError in the error
// detail so that the command can be retried.
if msg, ok := maybeDescriptorChangedError(desc, err); ok {
// NB: we have to wrap the existing error here as consumers of this code
// look at the root cause to sniff out the changed descriptor.
err = &benignError{errors.Wrap(err, msg)}
}
}

return reply, nil
}

// executeAdminCommandWithDescriptor wraps a read-modify-write operation for RangeDescriptors in a
// retry loop.
func (r *Replica) executeAdminCommandWithDescriptor(
ctx context.Context, updateDesc func(*roachpb.RangeDescriptor) error,
) *roachpb.Error {
retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
var lastErr error
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop.
// We need to explicitly check this condition. Having a valid lease, as we
// verify below, does not imply that the range still exists: even after a
// range has been merged into its left-hand neighbor, its final lease
// (i.e., the lease we have in r.mu.state.Lease) can remain valid
// indefinitely.
if _, err := r.IsDestroyed(); err != nil {
return roachpb.NewError(err)
}

// Admin commands always require the range lease to begin (see
// executeAdminBatch), but we may have lost it while in this retry loop.
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this, we
// make sure that we still have the lease before each attempt.
if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil {
return pErr
}

lastErr = updateDesc(r.Desc())
// On seeing a ConditionFailedError or an AmbiguousResultError, retry the
// command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
switch err.(type) {
case *roachpb.ConditionFailedError:
return true
case *roachpb.AmbiguousResultError:
return true
default:
return false
}
}); !retry {
return roachpb.NewError(lastErr)
}
}
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.NewError(lastErr)
}

// AdminMerge extends this range to subsume the range that comes next
// in the key space. The merge is performed inside of a distributed
// transaction which writes the left hand side range descriptor (the
Expand Down