Skip to content

Commit

Permalink
Merge #102413
Browse files Browse the repository at this point in the history
102413: backupccl: don't restore sql_instances, sqlliveness or lease rows r=dt a=dt

Restoring these rows from the old cluster can cause the restored cluster to experience error when trying to plan and run queries that run on 'all' nodes due to the old rows in these tables that track the set of nodes still appearing to be valid until they expire.

Release note: none.
Epic: none.

Co-authored-by: David Taylor <tinystatemachine@gmail.com>
  • Loading branch information
craig[bot] and dt committed Apr 28, 2023
2 parents 3b1bc87 + 2881c34 commit f72c4b2
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 29 deletions.
19 changes: 19 additions & 0 deletions pkg/ccl/backupccl/key_rewriter.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,19 @@ func (kr *KeyRewriter) RewriteKey(key []byte, wallTime int64) ([]byte, bool, err
// in which case we are restoring as a system tenant.
if kr.fromSystemTenant && bytes.HasPrefix(key, keys.TenantPrefix) {
k, ok := kr.tenants.rewriteKey(key)
if ok {
// Skip keys from ephemeral cluster status tables so that the restored
// cluster does not observe stale leases/liveness until it expires.
noTenantPrefix, _, err := keys.DecodeTenantPrefix(key)
if err != nil {
return nil, false, err
}
_, tableID, _ := keys.SystemSQLCodec.DecodeTablePrefix(noTenantPrefix)

if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return k, false, nil
}
}
return k, ok, nil
}

Expand Down Expand Up @@ -282,6 +295,12 @@ func (kr *KeyRewriter) checkAndRewriteTableKey(key []byte, wallTime int64) ([]by
// perform a rewrite.
_, tableID, _ := keys.SystemSQLCodec.DecodeTablePrefix(key)

// Skip keys from ephemeral cluster status tables so that the restored cluster
// does not observe stale leases/liveness until it expires.
if tableID == keys.SQLInstancesTableID || tableID == keys.SqllivenessID || tableID == keys.LeaseTableID {
return key, false, nil
}

desc := kr.descs[descpb.ID(tableID)]
if desc == nil {
return nil, false, errors.Errorf("missing descriptor for table %d", tableID)
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/streamingccl/streamingest/stream_ingestion_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func ingestWithRetries(
if jobs.IsPermanentJobError(err) || errors.Is(err, context.Canceled) {
break
}
const msgFmt = "stream ingestion waits for retrying after error %s"
const msgFmt = "stream ingestion waits for retrying after error: %q"
log.Warningf(ctx, msgFmt, err)
updateRunningStatus(ctx, execCtx, ingestionJob, jobspb.ReplicationError,
fmt.Sprintf(msgFmt, err))
Expand Down
28 changes: 16 additions & 12 deletions pkg/ccl/streamingccl/streamingest/stream_ingestion_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,15 +655,8 @@ func (sip *streamIngestionProcessor) consumeEvents() (*jobspb.ResolvedSpans, err
return nil, nil
}

func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, error) {
rekey, ok, err := sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
if !ok {
return nil, errors.New("every key is expected to match tenant prefix")
}
if err != nil {
return nil, err
}
return rekey, nil
func (sip *streamIngestionProcessor) rekey(key roachpb.Key) ([]byte, bool, error) {
return sip.rekeyer.RewriteKey(key, 0 /*wallTime*/)
}

func (sip *streamIngestionProcessor) bufferSST(sst *kvpb.RangeFeedSSTable) error {
Expand Down Expand Up @@ -715,14 +708,21 @@ func (sip *streamIngestionProcessor) bufferRangeKeyVal(
defer sp.Finish()

var err error
rangeKeyVal.RangeKey.StartKey, err = sip.rekey(rangeKeyVal.RangeKey.StartKey)
var ok bool
rangeKeyVal.RangeKey.StartKey, ok, err = sip.rekey(rangeKeyVal.RangeKey.StartKey)
if err != nil {
return err
}
rangeKeyVal.RangeKey.EndKey, err = sip.rekey(rangeKeyVal.RangeKey.EndKey)
if !ok {
return nil
}
rangeKeyVal.RangeKey.EndKey, ok, err = sip.rekey(rangeKeyVal.RangeKey.EndKey)
if err != nil {
return err
}
if !ok {
return nil
}
sip.rangeBatcher.buffer(rangeKeyVal)
return nil
}
Expand Down Expand Up @@ -750,10 +750,14 @@ func (sip *streamIngestionProcessor) bufferKV(kv *roachpb.KeyValue) error {
}

var err error
kv.Key, err = sip.rekey(kv.Key)
var ok bool
kv.Key, ok, err = sip.rekey(kv.Key)
if err != nil {
return err
}
if !ok {
return nil
}

if sip.rewriteToDiffKey {
kv.Value.ClearChecksum()
Expand Down
54 changes: 38 additions & 16 deletions pkg/storage/fingerprint_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ func (f *fingerprintWriter) PutRawMVCCRangeKey(key MVCCRangeKey, bytes []byte) e
func (f *fingerprintWriter) PutRawMVCC(key MVCCKey, value []byte) error {
defer f.hasher.Reset()
// Hash the key/timestamp and value of the RawMVCC.
if err := f.hashKey(key.Key); err != nil {
err, skip := f.hashKey(key.Key)
if err != nil {
return err
}
if skip {
return nil
}
if err := f.hashTimestamp(key.Timestamp); err != nil {
return err
}
Expand All @@ -134,9 +138,14 @@ func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error
defer f.hasher.Reset()

// Hash the key and value in the absence of a timestamp.
if err := f.hashKey(key); err != nil {
err, skip := f.hashKey(key)
if err != nil {
return err
}
if skip {
return nil
}

if err := f.hashValue(value); err != nil {
return err
}
Expand All @@ -145,14 +154,27 @@ func (f *fingerprintWriter) PutUnversioned(key roachpb.Key, value []byte) error
return nil
}

func (f *fingerprintWriter) hashKey(key []byte) error {
func (f *fingerprintWriter) hashKey(key []byte) (error, bool) {
noTenantPrefix, err := keys.StripTenantPrefix(key)
if err != nil {
return err, false
}
// Fingerprinting ignores rows from a few special-cased key ranges, namely for
// the tables that contain ephemeral cluster-topology/state information, which
// if expected to differ between two clusters that otherwise contain the same
// data.
_, tID, _, _ := keys.DecodeTableIDIndexID(noTenantPrefix)
if tID == keys.SqllivenessID || tID == keys.LeaseTableID || tID == keys.SQLInstancesTableID {
return nil, true
}

if f.options.StripIndexPrefixAndTimestamp {
return f.hash(f.stripIndexPrefix(key))
return f.hash(f.stripIndexPrefix(key)), false
}
if f.options.StripTenantPrefix {
return f.hash(f.stripTenantPrefix(key))
return f.hash(noTenantPrefix), false
}
return f.hash(key)
return f.hash(key), false
}

func (f *fingerprintWriter) hashTimestamp(timestamp hlc.Timestamp) error {
Expand Down Expand Up @@ -189,14 +211,6 @@ func (f *fingerprintWriter) stripValueChecksum(value []byte) []byte {
return value[mvccChecksumSize:]
}

func (f *fingerprintWriter) stripTenantPrefix(key []byte) []byte {
remainder, err := keys.StripTenantPrefix(key)
if err != nil {
return key
}
return remainder
}

func (f *fingerprintWriter) stripIndexPrefix(key []byte) []byte {
remainder, err := keys.StripIndexPrefix(key)
if err != nil {
Expand Down Expand Up @@ -261,12 +275,20 @@ func FingerprintRangekeys(
defer fw.Close()
fingerprintRangeKey := func(stack MVCCRangeKeyStack) (uint64, error) {
defer fw.hasher.Reset()
if err := fw.hashKey(stack.Bounds.Key); err != nil {
err, skip := fw.hashKey(stack.Bounds.Key)
if err != nil {
return 0, err
}
if err := fw.hashKey(stack.Bounds.EndKey); err != nil {
if skip {
return 0, nil
}
err, skip = fw.hashKey(stack.Bounds.EndKey)
if err != nil {
return 0, err
}
if skip {
return 0, nil
}
for _, v := range stack.Versions {
if err := fw.hashTimestamp(v.Timestamp); err != nil {
return 0, err
Expand Down

0 comments on commit f72c4b2

Please sign in to comment.