Skip to content

Commit

Permalink
Merge #38079 #38295
Browse files Browse the repository at this point in the history
38079: bulk: use expiration time to disable automatic merging in import, backfill, restore r=jeffrey-xiao a=jeffrey-xiao

The existing mechanism to prevent the merge queue from automatically merging splits created in import, backfill, and restore was to gossip the table keys that the merge queue should ignore when scanning
replicas. Now that there is support for specifying an expiration time at a range level, we can use that instead of the gossip mechanism.

All splits created during backfill, restore, and import use an expiration time of an hour. The rationale behind using an expiration time rather than unsplitting the ranges at the end of the operation is because adding an additional O(n) cost to bulk operations is not ideal when n can be large.

Fixes #37697.

@dt I don't have context on how long import jobs take, so an hour might be too conservative.

38295: storage: fix flake in Test{Conditional,Init}PutUpdatesTSCacheOnError r=nvanbenschoten a=nvanbenschoten

Fixes #38256.

A request was slipping in between the manual clock update and the first
request the test sent with an unspecified timestamp. This commit fixes
the issue by explicitly specifying the timestamp.

Release note: None

Co-authored-by: Jeffrey Xiao <jeffrey.xiao1998@gmail.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
3 people committed Jun 19, 2019
3 parents 7042814 + b7e0c23 + e1f60cd commit a9802cd
Show file tree
Hide file tree
Showing 7 changed files with 155 additions and 65 deletions.
30 changes: 24 additions & 6 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"runtime"
"sort"
"sync/atomic"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
Expand Down Expand Up @@ -752,6 +753,7 @@ rangeLoop:
// there's some way to test it without running an O(hour) long benchmark.
func splitAndScatter(
restoreCtx context.Context,
settings *cluster.Settings,
db *client.DB,
kr *storageccl.KeyRewriter,
numClusterNodes int,
Expand Down Expand Up @@ -780,6 +782,16 @@ func splitAndScatter(
}

importSpanChunksCh := make(chan []importEntry)
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
for idx, importSpanChunk := range importSpanChunks {
Expand All @@ -793,7 +805,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of the first
// entry in the _next_ chunk.
log.VEventf(restoreCtx, 1, "presplitting chunk %d of %d", idx, len(importSpanChunks))
if err := db.AdminSplit(ctx, chunkKey, chunkKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, chunkKey, chunkKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -849,7 +861,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of
// the _next_ entry.
log.VEventf(restoreCtx, 1, "presplitting %d of %d", idx, len(importSpans))
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -1018,6 +1030,7 @@ func restore(
restoreCtx context.Context,
db *client.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
backupDescs []BackupDescriptor,
endTime hlc.Timestamp,
sqlDescs []sqlbase.Descriptor,
Expand Down Expand Up @@ -1067,9 +1080,13 @@ func restore(
return mu.res, nil, nil, err
}

{
// Disable merging for the table IDs being restored into. We don't want the
// merge queue undoing the splits performed during RESTORE.
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we don't have to worry about the
// merge queue automatically merging the splits performed during RESTORE.
// Otherwise, we have to rely on the gossip mechanism to disable the merge
// queue for the table IDs being restored into.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
Expand Down Expand Up @@ -1157,7 +1174,7 @@ func restore(
readyForImportCh := make(chan importEntry, presplitLeadLimit)
g.GoCtx(func(ctx context.Context) error {
defer close(readyForImportCh)
return splitAndScatter(ctx, db, kr, numClusterNodes, importSpans, readyForImportCh)
return splitAndScatter(ctx, settings, db, kr, numClusterNodes, importSpans, readyForImportCh)
})

requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
Expand Down Expand Up @@ -1448,6 +1465,7 @@ func (r *restoreResumer) Resume(
ctx,
p.ExecCfg().DB,
p.ExecCfg().Gossip,
p.ExecCfg().Settings,
backupDescs,
details.EndTime,
sqlDescs,
Expand Down
10 changes: 7 additions & 3 deletions pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -771,9 +771,13 @@ func (r *importResumer) Resume(
}
}

{
// Disable merging for the table IDs being imported into. We don't want the
// merge queue undoing the splits performed during IMPORT.
// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we don't have to worry about the
// merge queue automatically merging the splits performed during IMPORT.
// Otherwise, we have to rely on the gossip mechanism to disable the merge
// queue for the table IDs being imported into.
stickyBitEnabled := r.settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
tableIDs := make([]uint32, 0, len(tables))
for _, t := range tables {
tableIDs = append(tableIDs, uint32(t.ID))
Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io/ioutil"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -484,10 +485,21 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
return conv.readFiles(ctx, cp.spec.Uri, cp.spec.Format, progFn, cp.flowCtx.Settings)
})

// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := cp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cp.flowCtx.ClientDB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

if cp.spec.IngestDirectly {
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.AllIndexSpans() {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

Expand Down
14 changes: 13 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -147,6 +148,17 @@ func (sp *sstWriter) Run(ctx context.Context) {
return err
}

// TODO(jeffreyxiao): Remove this check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit
// to ensure that the splits are not automatically split by the merge
// queue. If the cluster does not support sticky bits, we disable the merge
// queue via gossip, so we can just set the split to expire immediately.
stickyBitEnabled := sp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = sp.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

// Fetch all the keys in each span and write them to storage.
iter := store.NewIterator()
defer iter.Close()
Expand Down Expand Up @@ -177,7 +189,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
end = sst.span.EndKey
}

if err := sp.db.AdminSplit(ctx, end, end, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sp.db.AdminSplit(ctx, end, end, expirationTime); err != nil {
return err
}

Expand Down
20 changes: 16 additions & 4 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand Down Expand Up @@ -1110,12 +1111,23 @@ func (sc *SchemaChanger) backfillIndexes(
fn()
}

disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
// TODO(jeffreyxiao): Remove check in 20.1.
// If the cluster supports sticky bits, then we should use the sticky bit to
// ensure that the splits are not automatically split by the merge queue. If
// the cluster does not support sticky bits, we disable the merge queue via
// gossip, so we can just set the split to expire immediately.
stickyBitEnabled := sc.execCfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if !stickyBitEnabled {
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
} else {
expirationTime = sc.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

for _, span := range addingSpans {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}
}
Expand Down
128 changes: 80 additions & 48 deletions pkg/storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,46 +49,12 @@ func (r *Replica) AdminSplit(
return roachpb.AdminSplitResponse{}, roachpb.NewErrorf("cannot split range with no key provided")
}

var lastErr error
retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop. We
// need to explicitly check this condition. Having a valid lease, as we
// verify below, does not imply that the range still exists: even after a
// range has been merged into its left-hand neighbor, its final lease (i.e.,
// the lease we have in r.mu.state.Lease) can remain valid indefinitely.
if _, err := r.IsDestroyed(); err != nil {
return reply, roachpb.NewError(err)
}

// Admin commands always require the range lease to begin (see
// executeAdminBatch), but we may have lost it while in this retry loop.
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this,
// we make sure that we still have the lease before each attempt.
if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil {
return roachpb.AdminSplitResponse{}, pErr
}

reply, lastErr = r.adminSplitWithDescriptor(ctx, args, r.Desc(), true /* delayable */, reason)
// On seeing a ConditionFailedError or an AmbiguousResultError, retry
// the command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
switch err.(type) {
case *roachpb.ConditionFailedError:
return true
case *roachpb.AmbiguousResultError:
return true
default:
return false
}
}); !retry {
return reply, roachpb.NewError(lastErr)
}
}
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.AdminSplitResponse{}, roachpb.NewError(lastErr)
err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
var err error
reply, err = r.adminSplitWithDescriptor(ctx, args, desc, true /* delayable */, reason)
return err
})
return reply, err
}

func maybeDescriptorChangedError(desc *roachpb.RangeDescriptor, err error) (string, bool) {
Expand Down Expand Up @@ -380,13 +346,24 @@ func (r *Replica) adminSplitWithDescriptor(
func (r *Replica) AdminUnsplit(
ctx context.Context, args roachpb.AdminUnsplitRequest, reason string,
) (roachpb.AdminUnsplitResponse, *roachpb.Error) {
// TODO(jeffreyxiao): Have a retry loop for ConditionalFailed errors similar
// to AdminSplit
var reply roachpb.AdminUnsplitResponse
err := r.executeAdminCommandWithDescriptor(ctx, func(desc *roachpb.RangeDescriptor) error {
var err error
reply, err = r.adminUnsplitWithDescriptor(ctx, args, desc, reason)
return err
})
return reply, err
}

desc := *r.Desc()
func (r *Replica) adminUnsplitWithDescriptor(
ctx context.Context,
args roachpb.AdminUnsplitRequest,
desc *roachpb.RangeDescriptor,
reason string,
) (roachpb.AdminUnsplitResponse, error) {
var reply roachpb.AdminUnsplitResponse
if !bytes.Equal(desc.StartKey.AsRawKey(), args.Header().Key) {
return reply, roachpb.NewErrorf("key %s is not the start of a range", args.Header().Key)
return reply, errors.Errorf("key %s is not the start of a range", args.Header().Key)
}

// If the range's sticky bit is already hlc.Timestamp{}, we treat the
Expand All @@ -398,11 +375,11 @@ func (r *Replica) AdminUnsplit(

if err := r.store.DB().Txn(ctx, func(ctx context.Context, txn *client.Txn) error {
b := txn.NewBatch()
newDesc := desc
newDesc := *desc
newDesc.StickyBit = hlc.Timestamp{}
descKey := keys.RangeDescriptorKey(newDesc.StartKey)

if err := updateRangeDescriptor(b, descKey, &desc, &newDesc); err != nil {
if err := updateRangeDescriptor(b, descKey, desc, &newDesc); err != nil {
return err
}
if err := updateRangeAddressing(b, &newDesc); err != nil {
Expand All @@ -421,12 +398,67 @@ func (r *Replica) AdminUnsplit(
})
return txn.Run(ctx, b)
}); err != nil {
return reply, roachpb.NewErrorf("unsplit at key %s failed: %s", args.Header().Key, err)
// The ConditionFailedError can occur because the descriptors acting as
// expected values in the CPuts used to update the range descriptor are
// picked outside the transaction. Return ConditionFailedError in the error
// detail so that the command can be retried.
if msg, ok := maybeDescriptorChangedError(desc, err); ok {
// NB: we have to wrap the existing error here as consumers of this code
// look at the root cause to sniff out the changed descriptor.
err = &benignError{errors.Wrap(err, msg)}
}
}

return reply, nil
}

// executeAdminCommandWithDescriptor wraps a read-modify-write operation for RangeDescriptors in a
// retry loop.
func (r *Replica) executeAdminCommandWithDescriptor(
ctx context.Context, updateDesc func(*roachpb.RangeDescriptor) error,
) *roachpb.Error {
retryOpts := base.DefaultRetryOptions()
retryOpts.MaxRetries = 10
var lastErr error
for retryable := retry.StartWithCtx(ctx, retryOpts); retryable.Next(); {
// The replica may have been destroyed since the start of the retry loop.
// We need to explicitly check this condition. Having a valid lease, as we
// verify below, does not imply that the range still exists: even after a
// range has been merged into its left-hand neighbor, its final lease
// (i.e., the lease we have in r.mu.state.Lease) can remain valid
// indefinitely.
if _, err := r.IsDestroyed(); err != nil {
return roachpb.NewError(err)
}

// Admin commands always require the range lease to begin (see
// executeAdminBatch), but we may have lost it while in this retry loop.
// Without the lease, a replica's local descriptor can be arbitrarily
// stale, which will result in a ConditionFailedError. To avoid this, we
// make sure that we still have the lease before each attempt.
if _, pErr := r.redirectOnOrAcquireLease(ctx); pErr != nil {
return pErr
}

lastErr = updateDesc(r.Desc())
// On seeing a ConditionFailedError or an AmbiguousResultError, retry the
// command with the updated descriptor.
if retry := causer.Visit(lastErr, func(err error) bool {
switch err.(type) {
case *roachpb.ConditionFailedError:
return true
case *roachpb.AmbiguousResultError:
return true
default:
return false
}
}); !retry {
return roachpb.NewError(lastErr)
}
}
// If we broke out of the loop after MaxRetries, return the last error.
return roachpb.NewError(lastErr)
}

// AdminMerge extends this range to subsume the range that comes next
// in the key space. The merge is performed inside of a distributed
// transaction which writes the left hand side range descriptor (the
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/replica_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2807,7 +2807,7 @@ func TestConditionalPutUpdatesTSCacheOnError(t *testing.T) {
// CPut args which expect value "1" to write "0".
key := []byte("a")
cpArgs1 := cPutArgs(key, []byte("1"), []byte("0"))
_, pErr := tc.SendWrapped(&cpArgs1)
_, pErr := tc.SendWrappedWith(roachpb.Header{Timestamp: t2}, &cpArgs1)
if cfErr, ok := pErr.GetDetail().(*roachpb.ConditionFailedError); !ok {
t.Errorf("expected ConditionFailedError; got %v", pErr)
} else if cfErr.ActualValue != nil {
Expand Down Expand Up @@ -2895,7 +2895,7 @@ func TestInitPutUpdatesTSCacheOnError(t *testing.T) {

// InitPut args to write "1" to same key. Should fail.
ipArgs2 := iPutArgs(key, []byte("1"))
_, pErr = tc.SendWrapped(&ipArgs2)
_, pErr = tc.SendWrappedWith(roachpb.Header{Timestamp: t2}, &ipArgs2)
if cfErr, ok := pErr.GetDetail().(*roachpb.ConditionFailedError); !ok {
t.Errorf("expected ConditionFailedError; got %v", pErr)
} else if valueBytes, err := cfErr.ActualValue.GetBytes(); err != nil {
Expand Down

0 comments on commit a9802cd

Please sign in to comment.