Skip to content

Commit

Permalink
bulk: use expiration time to disable automatic merging in import, bac…
Browse files Browse the repository at this point in the history
…kfill, restore

The existing mechanism to prevent the merge queue from automatically
merging splits created in import, backfill, and restore was to gossip
the table keys that the merge queue should ignore when scanning
replicas. Now that there is support for specifying an expiration time at
a range level, we can use that instead of the gossip mechanism.

Backfill and restore keep track of the ranges so we can set an
expiration time of hlc.MaxTimestamp and manually unsplit the ranges at
the end of the operation.

Since import creates splits on the range and does not keep track of
them, a conservative expiration duration of an hour was used.

Release note: None
  • Loading branch information
jeffrey-xiao committed Jun 7, 2019
1 parent 3d557c8 commit 3dae6e2
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 12 deletions.
32 changes: 28 additions & 4 deletions pkg/ccl/backupccl/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -752,6 +752,7 @@ rangeLoop:
// there's some way to test it without running an O(hour) long benchmark.
func splitAndScatter(
restoreCtx context.Context,
settings *cluster.Settings,
db *client.DB,
kr *storageccl.KeyRewriter,
numClusterNodes int,
Expand Down Expand Up @@ -780,6 +781,12 @@ func splitAndScatter(
}

importSpanChunksCh := make(chan []importEntry)
// TODO(jeffreyxiao): Remove this check in 20.1.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = hlc.MaxTimestamp
}
g.GoCtx(func(ctx context.Context) error {
defer close(importSpanChunksCh)
for idx, importSpanChunk := range importSpanChunks {
Expand All @@ -793,7 +800,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of the first
// entry in the _next_ chunk.
log.VEventf(restoreCtx, 1, "presplitting chunk %d of %d", idx, len(importSpanChunks))
if err := db.AdminSplit(ctx, chunkKey, chunkKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, chunkKey, chunkKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -849,7 +856,7 @@ func splitAndScatter(
// TODO(dan): Really, this should be splitting the Key of
// the _next_ entry.
log.VEventf(restoreCtx, 1, "presplitting %d of %d", idx, len(importSpans))
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := db.AdminSplit(ctx, newSpanKey, newSpanKey, expirationTime); err != nil {
return err
}

Expand Down Expand Up @@ -1019,6 +1026,7 @@ func restore(
restoreCtx context.Context,
db *client.DB,
gossip *gossip.Gossip,
settings *cluster.Settings,
backupDescs []BackupDescriptor,
endTime hlc.Timestamp,
sqlDescs []sqlbase.Descriptor,
Expand Down Expand Up @@ -1068,7 +1076,9 @@ func restore(
return mu.res, nil, nil, err
}

{
// TODO(jeffreyxiao): Remove this check in 20.1.
stickyBitEnabled := settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
// Disable merging for the table IDs being restored into. We don't want the
// merge queue undoing the splits performed during RESTORE.
tableIDs := make([]uint32, 0, len(tables))
Expand Down Expand Up @@ -1159,7 +1169,7 @@ func restore(
readyForImportCh := make(chan importEntry, presplitLeadLimit)
g.GoCtx(func(ctx context.Context) error {
defer close(readyForImportCh)
return splitAndScatter(ctx, db, kr, numClusterNodes, importSpans, readyForImportCh)
return splitAndScatter(ctx, settings, db, kr, numClusterNodes, importSpans, readyForImportCh)
})

requestFinishedCh := make(chan struct{}, len(importSpans)) // enough buffer to never block
Expand Down Expand Up @@ -1242,6 +1252,19 @@ func restore(
"importing %d ranges", len(importSpans))
}

// Unsplit ranges after restoration is complete.
if stickyBitEnabled {
for _, importSpan := range importSpans {
newSpanKey, err := rewriteBackupSpanKey(kr, importSpan.Span.Key)
if err != nil {
return mu.res, nil, nil, err
}
if err := db.AdminUnsplit(restoreCtx, newSpanKey); err != nil {
return mu.res, nil, nil, err
}
}
}

return mu.res, databases, tables, nil
}

Expand Down Expand Up @@ -1469,6 +1492,7 @@ func (r *restoreResumer) Resume(
ctx,
p.ExecCfg().DB,
p.ExecCfg().Gossip,
p.ExecCfg().Settings,
backupDescs,
details.EndTime,
sqlDescs,
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/importccl/import_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,8 @@ func (r *importResumer) Resume(
}
}

{
stickyBitEnabled := r.settings.Version.IsActive(cluster.VersionStickyBit)
if !stickyBitEnabled {
// Disable merging for the table IDs being imported into. We don't want the
// merge queue undoing the splits performed during IMPORT.
tableIDs := make([]uint32, 0, len(tables))
Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/importccl/read_import_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"io/ioutil"
"math/rand"
"strings"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -484,10 +485,17 @@ func (cp *readImportDataProcessor) doRun(ctx context.Context) error {
return conv.readFiles(ctx, cp.spec.Uri, cp.spec.Format, progFn, cp.flowCtx.Settings)
})

// TODO(jeffreyxiao): Remove this check in 20.1.
stickyBitEnabled := cp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = cp.flowCtx.ClientDB.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

if cp.spec.IngestDirectly {
for _, tbl := range cp.spec.Tables {
for _, span := range tbl.AllIndexSpans() {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := cp.flowCtx.ClientDB.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}

Expand Down
10 changes: 9 additions & 1 deletion pkg/ccl/importccl/sst_writer_proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"fmt"
"sort"
"time"

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl"
"github.com/cockroachdb/cockroach/pkg/internal/client"
Expand Down Expand Up @@ -147,6 +148,13 @@ func (sp *sstWriter) Run(ctx context.Context) {
return err
}

// TODO(jeffreyxiao): Remove this check in 20.1.
stickyBitEnabled := sp.flowCtx.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if stickyBitEnabled {
expirationTime = sp.db.Clock().Now().Add(time.Hour.Nanoseconds(), 0)
}

// Fetch all the keys in each span and write them to storage.
iter := store.NewIterator()
defer iter.Close()
Expand Down Expand Up @@ -177,7 +185,7 @@ func (sp *sstWriter) Run(ctx context.Context) {
end = sst.span.EndKey
}

if err := sp.db.AdminSplit(ctx, end, end, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sp.db.AdminSplit(ctx, end, end, expirationTime); err != nil {
return err
}

Expand Down
29 changes: 24 additions & 5 deletions pkg/sql/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/backfill"
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
Expand Down Expand Up @@ -1071,12 +1072,19 @@ func (sc *SchemaChanger) backfillIndexes(
fn()
}

disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
// TODO(jeffreyxiao): Remove check in 20.1.
stickyBitEnabled := sc.execCfg.Settings.Version.IsActive(cluster.VersionStickyBit)
expirationTime := hlc.Timestamp{}
if !stickyBitEnabled {
disableCtx, cancel := context.WithCancel(ctx)
defer cancel()
sc.execCfg.Gossip.DisableMerges(disableCtx, []uint32{uint32(sc.tableID)})
} else {
expirationTime = hlc.MaxTimestamp
}

for _, span := range addingSpans {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, hlc.Timestamp{} /* expirationTime */); err != nil {
if err := sc.db.AdminSplit(ctx, span.Key, span.Key, expirationTime); err != nil {
return err
}
}
Expand All @@ -1087,7 +1095,18 @@ func (sc *SchemaChanger) backfillIndexes(
backfill.IndexMutationFilter, addingSpans); err != nil {
return err
}
return sc.validateIndexes(ctx, evalCtx, lease)
if err := sc.validateIndexes(ctx, evalCtx, lease); err != nil {
return err
}
// Unsplit ranges after backfill and validation are complete.
if stickyBitEnabled {
for _, span := range addingSpans {
if err := sc.db.AdminUnsplit(ctx, span.Key); err != nil {
return err
}
}
}
return nil
}

func (sc *SchemaChanger) truncateAndBackfillColumns(
Expand Down

0 comments on commit 3dae6e2

Please sign in to comment.