Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] storage/concurrency: introduce concurrency control package, prototype SFU #43775

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion c-deps/libroach/include/libroach.h
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,15 @@ typedef struct {
DBChunkedBuffer data;
DBSlice intents;
DBTimestamp uncertainty_timestamp;
DBTimestamp write_too_old;
DBSlice resume_key;
} DBScanResults;

DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTxn txn,
bool inconsistent, bool tombstones);
DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool inconsistent, bool reverse,
bool tombstones);
bool tombstones, bool write_too_old);

// DBStatsResult contains various runtime stats for RocksDB.
typedef struct {
Expand Down
8 changes: 4 additions & 4 deletions c-deps/libroach/mvcc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,19 +276,19 @@ DBScanResults MVCCGet(DBIterator* iter, DBSlice key, DBTimestamp timestamp, DBTx
const DBSlice end = {0, 0};
ScopedStats scoped_iter(iter);
mvccForwardScanner scanner(iter, key, end, timestamp, 1 /* max_keys */, txn, inconsistent,
tombstones);
tombstones, false /* write_too_old */);
return scanner.get();
}

DBScanResults MVCCScan(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp,
int64_t max_keys, DBTxn txn, bool inconsistent, bool reverse,
bool tombstones) {
bool tombstones, bool write_too_old) {
ScopedStats scoped_iter(iter);
if (reverse) {
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, txn, inconsistent, tombstones);
mvccReverseScanner scanner(iter, end, start, timestamp, max_keys, txn, inconsistent, tombstones, write_too_old);
return scanner.scan();
} else {
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, txn, inconsistent, tombstones);
mvccForwardScanner scanner(iter, start, end, timestamp, max_keys, txn, inconsistent, tombstones, write_too_old);
return scanner.scan();
}
}
17 changes: 15 additions & 2 deletions c-deps/libroach/mvcc.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ static const int kMaxItersBeforeSeek = 10;
template <bool reverse> class mvccScanner {
public:
mvccScanner(DBIterator* iter, DBSlice start, DBSlice end, DBTimestamp timestamp, int64_t max_keys,
DBTxn txn, bool inconsistent, bool tombstones)
DBTxn txn, bool inconsistent, bool tombstones, bool write_too_old)
: iter_(iter),
iter_rep_(iter->rep.get()),
start_key_(ToSlice(start)),
Expand All @@ -62,6 +62,7 @@ template <bool reverse> class mvccScanner {
txn_ignored_seqnums_(txn.ignored_seqnums),
inconsistent_(inconsistent),
tombstones_(tombstones),
write_too_old_(write_too_old),
check_uncertainty_(timestamp < txn.max_timestamp),
kvs_(new chunkedBuffer),
intents_(new rocksdb::WriteBatch),
Expand Down Expand Up @@ -261,6 +262,13 @@ template <bool reverse> class mvccScanner {
return false;
}

bool writeTooOld(DBTimestamp ts) {
results_.write_too_old = ts;
kvs_->Clear();
intents_->Clear();
return false;
}

bool setStatus(const DBStatus& status) {
results_.status = status;
return false;
Expand All @@ -276,6 +284,10 @@ template <bool reverse> class mvccScanner {
return addAndAdvance(cur_value_);
}

if (write_too_old_) {
return writeTooOld(cur_timestamp_);
}

if (check_uncertainty_) {
// 2. Our txn's read timestamp is less than the max timestamp
// seen by the txn. We need to check for clock uncertainty
Expand Down Expand Up @@ -326,7 +338,7 @@ template <bool reverse> class mvccScanner {
// Intents for other transactions are visible at or below:
// max(txn.max_timestamp, read_timestamp)
const DBTimestamp max_visible_timestamp = check_uncertainty_ ? txn_max_timestamp_ : timestamp_;
if (max_visible_timestamp < meta_timestamp && !own_intent) {
if (max_visible_timestamp < meta_timestamp && !own_intent && !write_too_old_) {
// 5. The key contains an intent, but we're reading before the
// intent. Seek to the desired version. Note that if we own the
// intent (i.e. we're reading transactionally) we want to read
Expand Down Expand Up @@ -729,6 +741,7 @@ template <bool reverse> class mvccScanner {
const DBIgnoredSeqNums txn_ignored_seqnums_;
const bool inconsistent_;
const bool tombstones_;
const bool write_too_old_;
const bool check_uncertainty_;
DBScanResults results_;
std::unique_ptr<chunkedBuffer> kvs_;
Expand Down
41 changes: 37 additions & 4 deletions c-deps/libroach/protos/roachpb/api.pb.cc

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions c-deps/libroach/protos/roachpb/api.pb.h

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion pkg/ccl/changefeedccl/rowfetcher_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ func (c *rowFetcherCache) RowFetcherForTableDesc(

var rf row.Fetcher
if err := rf.Init(
false /* reverse */, false /* returnRangeInfo */, false /* isCheck */, &c.a,
false, /* reverse */
sqlbase.ScanLockingStrength_FOR_NONE,
false, /* returnRangeInfo */
false, /* isCheck */
&c.a,
row.FetcherTableArgs{
Spans: tableDesc.AllIndexSpans(),
Desc: tableDesc,
Expand Down
3 changes: 3 additions & 0 deletions pkg/kv/txn_interceptor_heartbeater.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ func firstWriteIndex(ba *roachpb.BatchRequest) (int, *roachpb.Error) {
if roachpb.IsTransactionWrite(args) {
return i, nil
}
if t, ok := args.(*roachpb.ScanRequest); ok && t.SelectForUpdate {
return i, nil
}
}
return -1, nil
}
4 changes: 4 additions & 0 deletions pkg/kv/txn_interceptor_pipeliner.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,10 @@ func (tp *txnPipeliner) updateWriteTracking(
tp.footprint.insert(sp)
}
}
} else if scan, ok := req.(*roachpb.ScanRequest); ok && scan.SelectForUpdate {
if sp, ok := roachpb.ActualSpan(req, resp); ok {
tp.footprint.insert(sp)
}
}
}
}
Expand Down
Loading