Skip to content

Commit

Permalink
txn: deferred constraint check (tikv#13121)
Browse files Browse the repository at this point in the history
close tikv#13128, ref pingcap/tidb#36579

Signed-off-by: ekexium <eke@fastmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
Signed-off-by: fengou1 <feng.ou@pingcap.com>
  • Loading branch information
2 people authored and fengou1 committed Aug 30, 2022
1 parent b307d32 commit 8de4058
Show file tree
Hide file tree
Showing 28 changed files with 714 additions and 284 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion components/cdc/src/old_value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ mod tests {

use engine_rocks::{ReadPerfInstant, RocksEngine};
use engine_traits::{KvEngine, MiscExt};
use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*;
use tikv::{
config::DbConfig,
storage::{kv::TestEngineBuilder, txn::tests::*},
Expand Down Expand Up @@ -415,7 +416,7 @@ mod tests {
must_commit(&engine, k, 7, 9);

must_acquire_pessimistic_lock(&engine, k, k, 8, 10);
must_pessimistic_prewrite_put(&engine, k, b"v5", k, 8, 10, true);
must_pessimistic_prewrite_put(&engine, k, b"v5", k, 8, 10, DoPessimisticCheck);
must_get_eq(&kv_engine, &key, 10, Some(b"v4".to_vec()));
must_commit(&engine, k, 8, 11);
}
Expand Down
6 changes: 4 additions & 2 deletions components/cdc/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use grpcio::{
};
use kvproto::{
cdcpb::{create_change_data, ChangeDataClient, ChangeDataEvent, ChangeDataRequest},
kvrpcpb::*,
kvrpcpb::{PrewriteRequestPessimisticAction::*, *},
tikvpb::TikvClient,
};
use online_config::OnlineConfig;
Expand Down Expand Up @@ -418,7 +418,9 @@ impl TestSuite {
prewrite_req.start_version = ts.into_inner();
prewrite_req.lock_ttl = prewrite_req.start_version + 1;
prewrite_req.for_update_ts = for_update_ts.into_inner();
prewrite_req.mut_is_pessimistic_lock().push(true);
prewrite_req
.mut_pessimistic_actions()
.push(DoPessimisticCheck);
let prewrite_resp = self
.get_tikv_client(region_id)
.kv_prewrite(&prewrite_req)
Expand Down
4 changes: 2 additions & 2 deletions components/resolved_ts/src/cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ pub fn lock_only_filter(mut cmd_batch: CmdBatch) -> Option<CmdBatch> {
#[cfg(test)]
mod tests {
use concurrency_manager::ConcurrencyManager;
use kvproto::kvrpcpb::AssertionLevel;
use kvproto::kvrpcpb::{AssertionLevel, PrewriteRequestPessimisticAction::*};
use tikv::storage::{
kv::{MockEngineBuilder, TestEngineBuilder},
lock_manager::DummyLockManager,
Expand Down Expand Up @@ -405,7 +405,7 @@ mod tests {
},
Mutation::make_put(k1.clone(), b"v4".to_vec()),
&None,
false,
SkipPessimisticCheck,
)
.unwrap();
one_pc_commit_ts(true, &mut txn, 10.into(), &DummyLockManager);
Expand Down
9 changes: 7 additions & 2 deletions components/resolved_ts/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@ use collections::HashMap;
use concurrency_manager::ConcurrencyManager;
use engine_rocks::{RocksEngine, RocksSnapshot};
use grpcio::{ChannelBuilder, ClientUnaryReceiver, Environment};
use kvproto::{kvrpcpb::*, tikvpb::TikvClient};
use kvproto::{
kvrpcpb::{PrewriteRequestPessimisticAction::*, *},
tikvpb::TikvClient,
};
use online_config::ConfigValue;
use raftstore::coprocessor::CoprocessorHost;
use resolved_ts::{Observer, Task};
Expand Down Expand Up @@ -261,7 +264,9 @@ impl TestSuite {
prewrite_req.start_version = ts.into_inner();
prewrite_req.lock_ttl = prewrite_req.start_version + 1;
prewrite_req.for_update_ts = for_update_ts.into_inner();
prewrite_req.mut_is_pessimistic_lock().push(true);
prewrite_req
.mut_pessimistic_actions()
.push(DoPessimisticCheck);
let prewrite_resp = self
.get_tikv_client(region_id)
.kv_prewrite(&prewrite_req)
Expand Down
6 changes: 3 additions & 3 deletions components/test_raftstore/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use futures::executor::block_on;
use grpcio::{ChannelBuilder, Environment};
use kvproto::{
encryptionpb::EncryptionMethod,
kvrpcpb::*,
kvrpcpb::{PrewriteRequestPessimisticAction::*, *},
metapb::{self, RegionEpoch},
pdpb::{
ChangePeer, ChangePeerV2, CheckPolicy, Merge, RegionHeartbeatResponse, SplitRegion,
Expand Down Expand Up @@ -894,7 +894,7 @@ pub fn must_kv_prewrite_with(
let mut prewrite_req = PrewriteRequest::default();
prewrite_req.set_context(ctx);
if for_update_ts != 0 {
prewrite_req.is_pessimistic_lock = vec![true; muts.len()];
prewrite_req.pessimistic_actions = vec![DoPessimisticCheck; muts.len()];
}
prewrite_req.set_mutations(muts.into_iter().collect());
prewrite_req.primary_lock = pk;
Expand Down Expand Up @@ -931,7 +931,7 @@ pub fn try_kv_prewrite_with(
let mut prewrite_req = PrewriteRequest::default();
prewrite_req.set_context(ctx);
if for_update_ts != 0 {
prewrite_req.is_pessimistic_lock = vec![true; muts.len()];
prewrite_req.pessimistic_actions = vec![DoPessimisticCheck; muts.len()];
}
prewrite_req.set_mutations(muts.into_iter().collect());
prewrite_req.primary_lock = pk;
Expand Down
51 changes: 36 additions & 15 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3227,7 +3227,7 @@ mod tests {
use error_code::ErrorCodeExt;
use errors::extract_key_error;
use futures::executor::block_on;
use kvproto::kvrpcpb::{AssertionLevel, CommandPri, Op};
use kvproto::kvrpcpb::{AssertionLevel, CommandPri, Op, PrewriteRequestPessimisticAction::*};
use tikv_util::config::ReadableSize;
use tracker::INVALID_TRACKER_TOKEN;
use txn_types::{Mutation, PessimisticLock, WriteType};
Expand Down Expand Up @@ -7199,8 +7199,14 @@ mod tests {
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![
(Mutation::make_put(key.clone(), val.clone()), true),
(Mutation::make_put(key2.clone(), val2.clone()), false),
(
Mutation::make_put(key.clone(), val.clone()),
DoPessimisticCheck,
),
(
Mutation::make_put(key2.clone(), val2.clone()),
SkipPessimisticCheck,
),
],
key.to_raw().unwrap(),
10.into(),
Expand Down Expand Up @@ -8059,8 +8065,14 @@ mod tests {
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![
(Mutation::make_put(Key::from_raw(b"d"), b"v".to_vec()), true),
(Mutation::make_put(Key::from_raw(b"e"), b"v".to_vec()), true),
(
Mutation::make_put(Key::from_raw(b"d"), b"v".to_vec()),
DoPessimisticCheck,
),
(
Mutation::make_put(Key::from_raw(b"e"), b"v".to_vec()),
DoPessimisticCheck,
),
],
b"d".to_vec(),
200.into(),
Expand Down Expand Up @@ -8152,7 +8164,10 @@ mod tests {
storage
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![(Mutation::make_put(key2.clone(), value2.clone()), true)],
vec![(
Mutation::make_put(key2.clone(), value2.clone()),
DoPessimisticCheck,
)],
k2.to_vec(),
10.into(),
0,
Expand Down Expand Up @@ -8197,8 +8212,11 @@ mod tests {
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![
(Mutation::make_put(key1.clone(), value1), true),
(Mutation::make_put(key2.clone(), value2), false),
(Mutation::make_put(key1.clone(), value1), DoPessimisticCheck),
(
Mutation::make_put(key2.clone(), value2),
SkipPessimisticCheck,
),
],
k1.to_vec(),
1.into(),
Expand Down Expand Up @@ -8435,23 +8453,23 @@ mod tests {
vec![
(
Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()),
true,
DoPessimisticCheck,
),
(
Mutation::make_put(Key::from_raw(b"k3"), b"v2".to_vec()),
true,
DoPessimisticCheck,
),
(
Mutation::make_put(Key::from_raw(b"k4"), b"v4".to_vec()),
true,
DoPessimisticCheck,
),
(
Mutation::make_put(Key::from_raw(b"k5"), b"v5".to_vec()),
true,
DoPessimisticCheck,
),
(
Mutation::make_put(Key::from_raw(b"k6"), b"v6".to_vec()),
true,
DoPessimisticCheck,
),
],
b"k1".to_vec(),
Expand Down Expand Up @@ -9023,7 +9041,10 @@ mod tests {
storage
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![(Mutation::make_put(k1.clone(), b"v".to_vec()), true)],
vec![(
Mutation::make_put(k1.clone(), b"v".to_vec()),
DoPessimisticCheck,
)],
b"k1".to_vec(),
10.into(),
3000,
Expand Down Expand Up @@ -9081,7 +9102,7 @@ mod tests {
storage
.sched_txn_command(
commands::PrewritePessimistic::new(
vec![(Mutation::make_put(k1, b"v".to_vec()), true)],
vec![(Mutation::make_put(k1, b"v".to_vec()), DoPessimisticCheck)],
b"k1".to_vec(),
10.into(),
3000,
Expand Down
6 changes: 3 additions & 3 deletions src/storage/mvcc/reader/point_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ impl<S: Snapshot> PointGetter<S> {
#[cfg(test)]
mod tests {
use engine_rocks::ReadPerfInstant;
use kvproto::kvrpcpb::{Assertion, AssertionLevel};
use kvproto::kvrpcpb::{Assertion, AssertionLevel, PrewriteRequestPessimisticAction::*};
use txn_types::SHORT_VALUE_MAX_LEN;

use super::*;
Expand Down Expand Up @@ -929,7 +929,7 @@ mod tests {
//
// write.start_ts(10) < primary_lock.start_ts(15) < write.commit_ts(20)
must_acquire_pessimistic_lock(&engine, key, key, 15, 50);
must_pessimistic_prewrite_delete(&engine, key, key, 15, 50, true);
must_pessimistic_prewrite_delete(&engine, key, key, 15, 50, DoPessimisticCheck);
let mut getter = new_point_getter(&engine, TimeStamp::max());
must_get_value(&mut getter, key, val);
}
Expand Down Expand Up @@ -1017,7 +1017,7 @@ mod tests {
key,
&None,
80.into(),
false,
SkipPessimisticCheck,
100,
80.into(),
1,
Expand Down
6 changes: 3 additions & 3 deletions src/storage/mvcc/reader/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,7 +640,7 @@ pub mod tests {
CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE,
};
use kvproto::{
kvrpcpb::{AssertionLevel, Context},
kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*},
metapb::{Peer, Region},
};
use raftstore::store::RegionSnapshot;
Expand Down Expand Up @@ -749,7 +749,7 @@ pub mod tests {
&Self::txn_props(start_ts, pk, false),
m,
&None,
false,
SkipPessimisticCheck,
)
.unwrap();
self.write(txn.into_modifies());
Expand All @@ -773,7 +773,7 @@ pub mod tests {
&Self::txn_props(start_ts, pk, true),
m,
&None,
true,
DoPessimisticCheck,
)
.unwrap();
self.write(txn.into_modifies());
Expand Down
36 changes: 23 additions & 13 deletions src/storage/mvcc/reader/scanner/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2029,7 +2029,7 @@ mod latest_entry_tests {
#[cfg(test)]
mod delta_entry_tests {
use engine_traits::{CF_LOCK, CF_WRITE};
use kvproto::kvrpcpb::Context;
use kvproto::kvrpcpb::{Context, PrewriteRequestPessimisticAction::*};
use txn_types::{is_short_value, SHORT_VALUE_MAX_LEN};

use super::{super::ScannerBuilder, test_util::*, *};
Expand Down Expand Up @@ -2486,23 +2486,23 @@ mod delta_entry_tests {
key,
start_ts,
commit_ts - 1,
true,
DoPessimisticCheck,
),
WriteType::Delete => must_pessimistic_prewrite_delete(
&engine,
key,
key,
start_ts,
commit_ts - 1,
true,
DoPessimisticCheck,
),
WriteType::Lock => must_pessimistic_prewrite_lock(
&engine,
key,
key,
start_ts,
commit_ts - 1,
true,
DoPessimisticCheck,
),
WriteType::Rollback => must_rollback(&engine, key, start_ts, false),
}
Expand All @@ -2528,14 +2528,24 @@ mod delta_entry_tests {
key,
ts,
for_update_ts,
true,
DoPessimisticCheck,
),
LockType::Delete => must_pessimistic_prewrite_delete(
&engine,
key,
key,
ts,
for_update_ts,
DoPessimisticCheck,
),
LockType::Lock => must_pessimistic_prewrite_lock(
&engine,
key,
key,
ts,
for_update_ts,
DoPessimisticCheck,
),
LockType::Delete => {
must_pessimistic_prewrite_delete(&engine, key, key, ts, for_update_ts, true)
}
LockType::Lock => {
must_pessimistic_prewrite_lock(&engine, key, key, ts, for_update_ts, true)
}
LockType::Pessimistic => {}
}
}
Expand Down Expand Up @@ -2631,12 +2641,12 @@ mod delta_entry_tests {

// Generate put for [b] at 15.
must_acquire_pessimistic_lock(&engine, b"b", b"b", 9, 15);
must_pessimistic_prewrite_put(&engine, b"b", b"b_15", b"b", 9, 15, true);
must_pessimistic_prewrite_put(&engine, b"b", b"b_15", b"b", 9, 15, DoPessimisticCheck);

must_prewrite_put(&engine, b"c", b"c_4", b"c", 4);
must_commit(&engine, b"c", 4, 6);
must_acquire_pessimistic_lock(&engine, b"c", b"c", 5, 15);
must_pessimistic_prewrite_put(&engine, b"c", b"c_5", b"c", 5, 15, true);
must_pessimistic_prewrite_put(&engine, b"c", b"c_5", b"c", 5, 15, DoPessimisticCheck);
must_cleanup(&engine, b"c", 20, 0);

let entry_a_1 = EntryBuilder::default()
Expand Down
Loading

0 comments on commit 8de4058

Please sign in to comment.