Skip to content

Commit

Permalink
backupccl: don't restore sql_instances, sqlliveness or lease rows
Browse files Browse the repository at this point in the history
Restoring these rows from the old cluster can cause the restored cluster to experience
error when trying to plan and run queries that run on 'all' nodes due to the old rows
in these tables that track the set of nodes still appearing to be valid until they
expire.

Release note: none.
Epic: none.
  • Loading branch information
dt committed Apr 27, 2023
1 parent 9fb03ee commit 2881c34
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 29 deletions.
19 changes: 19 additions & 0 deletions pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
// in which case we are restoring as a system tenant.
if kr.fromSystemTenant && bytes.HasPrefix(key, keys.TenantPrefix) {
k, ok := kr.tenants.rewriteKey(key)
if ok {
// Skip keys from ephemeral cluster status tables so that the restored
// cluster does not observe stale leases/liveness until it expires.
noTenantPrefix, _, err := keys.DecodeTenantPrefix(key)
if err != nil {
return nil, false, err
}
_, tableID, _ := keys.SystemSQLCodec.DecodeTablePrefix(noTenantPrefix)

if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return k, false, nil
}
}
return k, ok, nil
}

Expand Down Expand Up @@ -282,6 +295,12 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// perform a rewrite.
_, tableID, _ := keys.SystemSQLCodec.DecodeTablePrefix(key)

// Skip keys from ephemeral cluster status tables so that the restored cluster
// does not observe stale leases/liveness until it expires.
if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return key, false, nil
}

desc := kr.descs[descpb.ID(tableID)]
if desc == nil {
return nil, false, errors.Errorf("missing descriptor for table %d", tableID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ func ingestWithRetries(
if jobs.IsPermanentJobError(err) || errors.Is(err, context.Canceled) {
break
}
const msgFmt = "stream ingestion waits for retrying after error %s"
const msgFmt = "stream ingestion waits for retrying after error: %q"
log.Warningf(ctx, msgFmt, err)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationError,
fmt.Sprintf(msgFmt, err))
Expand Down
28 changes: 16 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,15 +654,8 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
return nil, nil
}

func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, error) {
rekey, ok, err := sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
if !ok {
return nil, errors.New("every key is expected to match tenant prefix")
}
if err != nil {
return nil, err
}
return rekey, nil
func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, bool, error) {
return sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
}

func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error {
Expand Down Expand Up @@ -714,14 +707,21 @@ func (sip *streamIngestionProcessor) bufferRangeKeyVal(
defer sp.Finish()

var err error
rangeKeyVal.RangeKey.StartKey, err = sip.rekey(rangeKeyVal.RangeKey.StartKey)
var ok bool
rangeKeyVal.RangeKey.StartKey, ok, err = sip.rekey(rangeKeyVal.RangeKey.StartKey)
if err != nil {
return err
}
rangeKeyVal.RangeKey.EndKey, err = sip.rekey(rangeKeyVal.RangeKey.EndKey)
if !ok {
return nil
}
rangeKeyVal.RangeKey.EndKey, ok, err = sip.rekey(rangeKeyVal.RangeKey.EndKey)
if err != nil {
return err
}
if !ok {
return nil
}
sip.rangeBatcher.buffer(rangeKeyVal)
return nil
}
Expand Down Expand Up @@ -749,10 +749,14 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
}

var err error
kv.Key, err = sip.rekey(kv.Key)
var ok bool
kv.Key, ok, err = sip.rekey(kv.Key)
if err != nil {
return err
}
if !ok {
return nil
}

if sip.rewriteToDiffKey {
kv.Value.ClearChecksum()
Expand Down
54 changes: 38 additions & 16 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) e
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()
// Hash the key/timestamp and value of the RawMVCC.
if err := f.hashKey(key.Key); err != nil {
err, skip := f.hashKey(key.Key)
if err != nil {
return err
}
if skip {
return nil
}
if err := f.hashTimestamp(key.Timestamp); err != nil {
return err
}
Expand All @@ -134,9 +138,14 @@ func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hashKey(key); err != nil {
err, skip := f.hashKey(key)
if err != nil {
return err
}
if skip {
return nil
}

if err := f.hashValue(value); err != nil {
return err
}
Expand All @@ -145,14 +154,27 @@ func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error
return nil
}

func (f *fingerprintWriter) hashKey(key []byte) error {
func (f *fingerprintWriter) hashKey(key []byte) (error, bool) {
noTenantPrefix, err := keys.StripTenantPrefix(key)
if err != nil {
return err, false
}
// Fingerprinting ignores rows from a few special-cased key ranges, namely for
// the tables that contain ephemeral cluster-topology/state information, which
// if expected to differ between two clusters that otherwise contain the same
// data.
_, tID, _, _ := keys.DecodeTableIDIndexID(noTenantPrefix)
if tID == keys.SqllivenessID || tID == keys.LeaseTableID || tID == keys.SQLInstancesTableID {
return nil, true
}

if f.options.StripIndexPrefixAndTimestamp {
return f.hash(f.stripIndexPrefix(key))
return f.hash(f.stripIndexPrefix(key)), false
}
if f.options.StripTenantPrefix {
return f.hash(f.stripTenantPrefix(key))
return f.hash(noTenantPrefix), false
}
return f.hash(key)
return f.hash(key), false
}

func (f *fingerprintWriter) hashTimestamp(timestamp hlc.Timestamp) error {
Expand Down Expand Up @@ -189,14 +211,6 @@ func (f *fingerprintWriter) stripValueChecksum(value []byte) []byte {
return value[mvccChecksumSize:]
}

func (f *fingerprintWriter) stripTenantPrefix(key []byte) []byte {
remainder, err := keys.StripTenantPrefix(key)
if err != nil {
return key
}
return remainder
}

func (f *fingerprintWriter) stripIndexPrefix(key []byte) []byte {
remainder, err := keys.StripIndexPrefix(key)
if err != nil {
Expand Down Expand Up @@ -261,12 +275,20 @@ func FingerprintRangekeys(
defer fw.Close()
fingerprintRangeKey := func(stack MVCCRangeKeyStack) (uint64, error) {
defer fw.hasher.Reset()
if err := fw.hashKey(stack.Bounds.Key); err != nil {
err, skip := fw.hashKey(stack.Bounds.Key)
if err != nil {
return 0, err
}
if err := fw.hashKey(stack.Bounds.EndKey); err != nil {
if skip {
return 0, nil
}
err, skip = fw.hashKey(stack.Bounds.EndKey)
if err != nil {
return 0, err
}
if skip {
return 0, nil
}
for _, v := range stack.Versions {
if err := fw.hashTimestamp(v.Timestamp); err != nil {
return 0, err
Expand Down

0 comments on commit 2881c34

Please sign in to comment.