diff --git a/Cargo.lock b/Cargo.lock index b067e3337e5a..52ad79122037 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2627,7 +2627,7 @@ dependencies = [ [[package]] name = "kvproto" version = "0.0.2" -source = "git+https://github.com/pingcap/kvproto.git#affce57868b9f8befac389559d372369b2cb616f" +source = "git+https://github.com/pingcap/kvproto.git#a0f02b6efcee6112bdc313988bf6c0ae3f83c07d" dependencies = [ "futures 0.3.15", "grpcio", diff --git a/components/cdc/src/old_value.rs b/components/cdc/src/old_value.rs index 89f78f694c3c..9d60474b9525 100644 --- a/components/cdc/src/old_value.rs +++ b/components/cdc/src/old_value.rs @@ -293,6 +293,7 @@ mod tests { use engine_rocks::{ReadPerfInstant, RocksEngine}; use engine_traits::{KvEngine, MiscExt}; + use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*; use tikv::{ config::DbConfig, storage::{kv::TestEngineBuilder, txn::tests::*}, @@ -415,7 +416,7 @@ mod tests { must_commit(&engine, k, 7, 9); must_acquire_pessimistic_lock(&engine, k, k, 8, 10); - must_pessimistic_prewrite_put(&engine, k, b"v5", k, 8, 10, true); + must_pessimistic_prewrite_put(&engine, k, b"v5", k, 8, 10, DoPessimisticCheck); must_get_eq(&kv_engine, &key, 10, Some(b"v4".to_vec())); must_commit(&engine, k, 8, 11); } diff --git a/components/cdc/tests/mod.rs b/components/cdc/tests/mod.rs index 63c06551a80b..89eebcceec79 100644 --- a/components/cdc/tests/mod.rs +++ b/components/cdc/tests/mod.rs @@ -11,7 +11,7 @@ use grpcio::{ }; use kvproto::{ cdcpb::{create_change_data, ChangeDataClient, ChangeDataEvent, ChangeDataRequest}, - kvrpcpb::*, + kvrpcpb::{PrewriteRequestPessimisticAction::*, *}, tikvpb::TikvClient, }; use online_config::OnlineConfig; @@ -418,7 +418,9 @@ impl TestSuite { prewrite_req.start_version = ts.into_inner(); prewrite_req.lock_ttl = prewrite_req.start_version + 1; prewrite_req.for_update_ts = for_update_ts.into_inner(); - prewrite_req.mut_is_pessimistic_lock().push(true); + prewrite_req + .mut_pessimistic_actions() + .push(DoPessimisticCheck); let prewrite_resp = self .get_tikv_client(region_id) .kv_prewrite(&prewrite_req) diff --git a/components/resolved_ts/src/cmd.rs b/components/resolved_ts/src/cmd.rs index 277a31e20016..0bb22e0a21ea 100644 --- a/components/resolved_ts/src/cmd.rs +++ b/components/resolved_ts/src/cmd.rs @@ -286,7 +286,7 @@ pub fn lock_only_filter(mut cmd_batch: CmdBatch) -> Option { #[cfg(test)] mod tests { use concurrency_manager::ConcurrencyManager; - use kvproto::kvrpcpb::AssertionLevel; + use kvproto::kvrpcpb::{AssertionLevel, PrewriteRequestPessimisticAction::*}; use tikv::storage::{ kv::{MockEngineBuilder, TestEngineBuilder}, lock_manager::DummyLockManager, @@ -405,7 +405,7 @@ mod tests { }, Mutation::make_put(k1.clone(), b"v4".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); one_pc_commit_ts(true, &mut txn, 10.into(), &DummyLockManager); diff --git a/components/resolved_ts/tests/mod.rs b/components/resolved_ts/tests/mod.rs index 3d7fdb875692..0e6d8bbc9f88 100644 --- a/components/resolved_ts/tests/mod.rs +++ b/components/resolved_ts/tests/mod.rs @@ -6,7 +6,10 @@ use collections::HashMap; use concurrency_manager::ConcurrencyManager; use engine_rocks::{RocksEngine, RocksSnapshot}; use grpcio::{ChannelBuilder, ClientUnaryReceiver, Environment}; -use kvproto::{kvrpcpb::*, tikvpb::TikvClient}; +use kvproto::{ + kvrpcpb::{PrewriteRequestPessimisticAction::*, *}, + tikvpb::TikvClient, +}; use online_config::ConfigValue; use raftstore::coprocessor::CoprocessorHost; use resolved_ts::{Observer, Task}; @@ -261,7 +264,9 @@ impl TestSuite { prewrite_req.start_version = ts.into_inner(); prewrite_req.lock_ttl = prewrite_req.start_version + 1; prewrite_req.for_update_ts = for_update_ts.into_inner(); - prewrite_req.mut_is_pessimistic_lock().push(true); + prewrite_req + .mut_pessimistic_actions() + .push(DoPessimisticCheck); let prewrite_resp = self .get_tikv_client(region_id) .kv_prewrite(&prewrite_req) diff --git a/components/test_raftstore/src/util.rs b/components/test_raftstore/src/util.rs index eaeaf6a4e0fb..8cac947dc574 100644 --- a/components/test_raftstore/src/util.rs +++ b/components/test_raftstore/src/util.rs @@ -24,7 +24,7 @@ use futures::executor::block_on; use grpcio::{ChannelBuilder, Environment}; use kvproto::{ encryptionpb::EncryptionMethod, - kvrpcpb::*, + kvrpcpb::{PrewriteRequestPessimisticAction::*, *}, metapb::{self, RegionEpoch}, pdpb::{ ChangePeer, ChangePeerV2, CheckPolicy, Merge, RegionHeartbeatResponse, SplitRegion, @@ -894,7 +894,7 @@ pub fn must_kv_prewrite_with( let mut prewrite_req = PrewriteRequest::default(); prewrite_req.set_context(ctx); if for_update_ts != 0 { - prewrite_req.is_pessimistic_lock = vec![true; muts.len()]; + prewrite_req.pessimistic_actions = vec![DoPessimisticCheck; muts.len()]; } prewrite_req.set_mutations(muts.into_iter().collect()); prewrite_req.primary_lock = pk; @@ -931,7 +931,7 @@ pub fn try_kv_prewrite_with( let mut prewrite_req = PrewriteRequest::default(); prewrite_req.set_context(ctx); if for_update_ts != 0 { - prewrite_req.is_pessimistic_lock = vec![true; muts.len()]; + prewrite_req.pessimistic_actions = vec![DoPessimisticCheck; muts.len()]; } prewrite_req.set_mutations(muts.into_iter().collect()); prewrite_req.primary_lock = pk; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index d974c731db06..3024a05381f9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -3227,7 +3227,7 @@ mod tests { use error_code::ErrorCodeExt; use errors::extract_key_error; use futures::executor::block_on; - use kvproto::kvrpcpb::{AssertionLevel, CommandPri, Op}; + use kvproto::kvrpcpb::{AssertionLevel, CommandPri, Op, PrewriteRequestPessimisticAction::*}; use tikv_util::config::ReadableSize; use tracker::INVALID_TRACKER_TOKEN; use txn_types::{Mutation, PessimisticLock, WriteType}; @@ -7199,8 +7199,14 @@ mod tests { .sched_txn_command( commands::PrewritePessimistic::new( vec![ - (Mutation::make_put(key.clone(), val.clone()), true), - (Mutation::make_put(key2.clone(), val2.clone()), false), + ( + Mutation::make_put(key.clone(), val.clone()), + DoPessimisticCheck, + ), + ( + Mutation::make_put(key2.clone(), val2.clone()), + SkipPessimisticCheck, + ), ], key.to_raw().unwrap(), 10.into(), @@ -8059,8 +8065,14 @@ mod tests { .sched_txn_command( commands::PrewritePessimistic::new( vec![ - (Mutation::make_put(Key::from_raw(b"d"), b"v".to_vec()), true), - (Mutation::make_put(Key::from_raw(b"e"), b"v".to_vec()), true), + ( + Mutation::make_put(Key::from_raw(b"d"), b"v".to_vec()), + DoPessimisticCheck, + ), + ( + Mutation::make_put(Key::from_raw(b"e"), b"v".to_vec()), + DoPessimisticCheck, + ), ], b"d".to_vec(), 200.into(), @@ -8152,7 +8164,10 @@ mod tests { storage .sched_txn_command( commands::PrewritePessimistic::new( - vec![(Mutation::make_put(key2.clone(), value2.clone()), true)], + vec![( + Mutation::make_put(key2.clone(), value2.clone()), + DoPessimisticCheck, + )], k2.to_vec(), 10.into(), 0, @@ -8197,8 +8212,11 @@ mod tests { .sched_txn_command( commands::PrewritePessimistic::new( vec![ - (Mutation::make_put(key1.clone(), value1), true), - (Mutation::make_put(key2.clone(), value2), false), + (Mutation::make_put(key1.clone(), value1), DoPessimisticCheck), + ( + Mutation::make_put(key2.clone(), value2), + SkipPessimisticCheck, + ), ], k1.to_vec(), 1.into(), @@ -8435,23 +8453,23 @@ mod tests { vec![ ( Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), - true, + DoPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"k3"), b"v2".to_vec()), - true, + DoPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"k4"), b"v4".to_vec()), - true, + DoPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"k5"), b"v5".to_vec()), - true, + DoPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"k6"), b"v6".to_vec()), - true, + DoPessimisticCheck, ), ], b"k1".to_vec(), @@ -9023,7 +9041,10 @@ mod tests { storage .sched_txn_command( commands::PrewritePessimistic::new( - vec![(Mutation::make_put(k1.clone(), b"v".to_vec()), true)], + vec![( + Mutation::make_put(k1.clone(), b"v".to_vec()), + DoPessimisticCheck, + )], b"k1".to_vec(), 10.into(), 3000, @@ -9081,7 +9102,7 @@ mod tests { storage .sched_txn_command( commands::PrewritePessimistic::new( - vec![(Mutation::make_put(k1, b"v".to_vec()), true)], + vec![(Mutation::make_put(k1, b"v".to_vec()), DoPessimisticCheck)], b"k1".to_vec(), 10.into(), 3000, diff --git a/src/storage/mvcc/reader/point_getter.rs b/src/storage/mvcc/reader/point_getter.rs index 7c521bb59522..2a231b428233 100644 --- a/src/storage/mvcc/reader/point_getter.rs +++ b/src/storage/mvcc/reader/point_getter.rs @@ -389,7 +389,7 @@ impl PointGetter { #[cfg(test)] mod tests { use engine_rocks::ReadPerfInstant; - use kvproto::kvrpcpb::{Assertion, AssertionLevel}; + use kvproto::kvrpcpb::{Assertion, AssertionLevel, PrewriteRequestPessimisticAction::*}; use txn_types::SHORT_VALUE_MAX_LEN; use super::*; @@ -929,7 +929,7 @@ mod tests { // // write.start_ts(10) < primary_lock.start_ts(15) < write.commit_ts(20) must_acquire_pessimistic_lock(&engine, key, key, 15, 50); - must_pessimistic_prewrite_delete(&engine, key, key, 15, 50, true); + must_pessimistic_prewrite_delete(&engine, key, key, 15, 50, DoPessimisticCheck); let mut getter = new_point_getter(&engine, TimeStamp::max()); must_get_value(&mut getter, key, val); } @@ -1017,7 +1017,7 @@ mod tests { key, &None, 80.into(), - false, + SkipPessimisticCheck, 100, 80.into(), 1, diff --git a/src/storage/mvcc/reader/reader.rs b/src/storage/mvcc/reader/reader.rs index c45fabe2540c..f1ed7748a15c 100644 --- a/src/storage/mvcc/reader/reader.rs +++ b/src/storage/mvcc/reader/reader.rs @@ -640,7 +640,7 @@ pub mod tests { CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE, }; use kvproto::{ - kvrpcpb::{AssertionLevel, Context}, + kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*}, metapb::{Peer, Region}, }; use raftstore::store::RegionSnapshot; @@ -749,7 +749,7 @@ pub mod tests { &Self::txn_props(start_ts, pk, false), m, &None, - false, + SkipPessimisticCheck, ) .unwrap(); self.write(txn.into_modifies()); @@ -773,7 +773,7 @@ pub mod tests { &Self::txn_props(start_ts, pk, true), m, &None, - true, + DoPessimisticCheck, ) .unwrap(); self.write(txn.into_modifies()); diff --git a/src/storage/mvcc/reader/scanner/forward.rs b/src/storage/mvcc/reader/scanner/forward.rs index a7a839cf2e78..6bed02890534 100644 --- a/src/storage/mvcc/reader/scanner/forward.rs +++ b/src/storage/mvcc/reader/scanner/forward.rs @@ -2029,7 +2029,7 @@ mod latest_entry_tests { #[cfg(test)] mod delta_entry_tests { use engine_traits::{CF_LOCK, CF_WRITE}; - use kvproto::kvrpcpb::Context; + use kvproto::kvrpcpb::{Context, PrewriteRequestPessimisticAction::*}; use txn_types::{is_short_value, SHORT_VALUE_MAX_LEN}; use super::{super::ScannerBuilder, test_util::*, *}; @@ -2486,7 +2486,7 @@ mod delta_entry_tests { key, start_ts, commit_ts - 1, - true, + DoPessimisticCheck, ), WriteType::Delete => must_pessimistic_prewrite_delete( &engine, @@ -2494,7 +2494,7 @@ mod delta_entry_tests { key, start_ts, commit_ts - 1, - true, + DoPessimisticCheck, ), WriteType::Lock => must_pessimistic_prewrite_lock( &engine, @@ -2502,7 +2502,7 @@ mod delta_entry_tests { key, start_ts, commit_ts - 1, - true, + DoPessimisticCheck, ), WriteType::Rollback => must_rollback(&engine, key, start_ts, false), } @@ -2528,14 +2528,24 @@ mod delta_entry_tests { key, ts, for_update_ts, - true, + DoPessimisticCheck, + ), + LockType::Delete => must_pessimistic_prewrite_delete( + &engine, + key, + key, + ts, + for_update_ts, + DoPessimisticCheck, + ), + LockType::Lock => must_pessimistic_prewrite_lock( + &engine, + key, + key, + ts, + for_update_ts, + DoPessimisticCheck, ), - LockType::Delete => { - must_pessimistic_prewrite_delete(&engine, key, key, ts, for_update_ts, true) - } - LockType::Lock => { - must_pessimistic_prewrite_lock(&engine, key, key, ts, for_update_ts, true) - } LockType::Pessimistic => {} } } @@ -2631,12 +2641,12 @@ mod delta_entry_tests { // Generate put for [b] at 15. must_acquire_pessimistic_lock(&engine, b"b", b"b", 9, 15); - must_pessimistic_prewrite_put(&engine, b"b", b"b_15", b"b", 9, 15, true); + must_pessimistic_prewrite_put(&engine, b"b", b"b_15", b"b", 9, 15, DoPessimisticCheck); must_prewrite_put(&engine, b"c", b"c_4", b"c", 4); must_commit(&engine, b"c", 4, 6); must_acquire_pessimistic_lock(&engine, b"c", b"c", 5, 15); - must_pessimistic_prewrite_put(&engine, b"c", b"c_5", b"c", 5, 15, true); + must_pessimistic_prewrite_put(&engine, b"c", b"c_5", b"c", 5, 15, DoPessimisticCheck); must_cleanup(&engine, b"c", 20, 0); let entry_a_1 = EntryBuilder::default() diff --git a/src/storage/mvcc/txn.rs b/src/storage/mvcc/txn.rs index b0a64d83f223..a9032d1b4639 100644 --- a/src/storage/mvcc/txn.rs +++ b/src/storage/mvcc/txn.rs @@ -274,7 +274,7 @@ pub(crate) fn make_txn_error( #[cfg(test)] pub(crate) mod tests { - use kvproto::kvrpcpb::{AssertionLevel, Context}; + use kvproto::kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*}; use txn_types::{TimeStamp, WriteType, SHORT_VALUE_MAX_LEN}; use super::*; @@ -341,7 +341,7 @@ pub(crate) mod tests { must_commit(&engine, k1, 25, 27); must_acquire_pessimistic_lock(&engine, k1, k1, 23, 29); must_get(&engine, k1, 30, v); - must_pessimistic_prewrite_delete(&engine, k1, k1, 23, 29, true); + must_pessimistic_prewrite_delete(&engine, k1, k1, 23, 29, DoPessimisticCheck); must_get_err(&engine, k1, 30); // should read the latest record when `ts == u64::max_value()` // even if lock.start_ts(23) < latest write.commit_ts(27) @@ -521,8 +521,8 @@ pub(crate) mod tests { must_acquire_pessimistic_lock(&engine, k1, k1, 15, 15); must_acquire_pessimistic_lock(&engine, k2, k1, 15, 17); - must_pessimistic_prewrite_put(&engine, k1, v, k1, 15, 17, true); - must_pessimistic_prewrite_put(&engine, k2, v, k1, 15, 17, true); + must_pessimistic_prewrite_put(&engine, k1, v, k1, 15, 17, DoPessimisticCheck); + must_pessimistic_prewrite_put(&engine, k2, v, k1, 15, 17, DoPessimisticCheck); must_rollback(&engine, k1, 15, false); must_rollback(&engine, k2, 15, false); // The rollback of the primary key should be protected @@ -758,7 +758,7 @@ pub(crate) mod tests { &txn_props(10.into(), pk, CommitKind::TwoPc, None, 0, false), Mutation::make_put(key.clone(), v.to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); assert!(txn.write_size() > 0); @@ -802,7 +802,7 @@ pub(crate) mod tests { &txn_props(5.into(), key, CommitKind::TwoPc, None, 0, false), Mutation::make_put(Key::from_raw(key), value.to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap_err(); @@ -815,7 +815,7 @@ pub(crate) mod tests { &txn_props(5.into(), key, CommitKind::TwoPc, None, 0, true), Mutation::make_put(Key::from_raw(key), value.to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); } @@ -961,7 +961,7 @@ pub(crate) mod tests { // original pessimisitic lock. must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 10, 10, 100); must_pessimistic_locked(&engine, k, 10, 10); - must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 10, 10, true, 110); + must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 10, 10, DoPessimisticCheck, 110); must_locked_with_ttl(&engine, k, 10, 110); must_rollback(&engine, k, 10, false); @@ -970,7 +970,7 @@ pub(crate) mod tests { // the prewrite request. must_acquire_pessimistic_lock_with_ttl(&engine, k, k, 20, 20, 100); must_pessimistic_locked(&engine, k, 20, 20); - must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 20, 20, true, 90); + must_pessimistic_prewrite_put_with_ttl(&engine, k, v, k, 20, 20, DoPessimisticCheck, 90); must_locked_with_ttl(&engine, k, 20, 100); } @@ -984,7 +984,7 @@ pub(crate) mod tests { must_prewrite_put(&engine, k, v, k, 10); must_commit(&engine, k, 10, 11); must_acquire_pessimistic_lock(&engine, k, k, 5, 12); - must_pessimistic_prewrite_lock(&engine, k, k, 5, 12, true); + must_pessimistic_prewrite_lock(&engine, k, k, 5, 12, DoPessimisticCheck); must_commit(&engine, k, 5, 15); // Now in write cf: @@ -1025,7 +1025,7 @@ pub(crate) mod tests { expected_lock_info.get_primary_lock(), &None, expected_lock_info.get_lock_version().into(), - false, + SkipPessimisticCheck, expected_lock_info.get_lock_ttl(), TimeStamp::zero(), expected_lock_info.get_txn_size(), @@ -1068,7 +1068,7 @@ pub(crate) mod tests { expected_lock_info.set_lock_ttl(0); assert_lock_info_eq( - must_pessimistic_prewrite_put_err(&engine, k, v, k, 40, 40, false), + must_pessimistic_prewrite_put_err(&engine, k, v, k, 40, 40, SkipPessimisticCheck), &expected_lock_info, ); @@ -1095,8 +1095,8 @@ pub(crate) mod tests { must_prewrite_put(&engine, k, v, k, 2); must_locked(&engine, k, 2); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 1, 1, false); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 3, 3, false); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 1, 1, SkipPessimisticCheck); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 3, 3, SkipPessimisticCheck); } #[test] @@ -1117,19 +1117,19 @@ pub(crate) mod tests { must_acquire_pessimistic_lock_err(&engine, k3, k1, 10, 10); // Update for_update_ts to 20 due to write conflict must_acquire_pessimistic_lock(&engine, k3, k1, 10, 20); - must_pessimistic_prewrite_put(&engine, k1, v1, k1, 10, 20, true); - must_pessimistic_prewrite_put(&engine, k3, v3, k1, 10, 20, true); + must_pessimistic_prewrite_put(&engine, k1, v1, k1, 10, 20, DoPessimisticCheck); + must_pessimistic_prewrite_put(&engine, k3, v3, k1, 10, 20, DoPessimisticCheck); // Write a non-pessimistic lock with for_update_ts 20. - must_pessimistic_prewrite_put(&engine, k2, v2, k1, 10, 20, false); + must_pessimistic_prewrite_put(&engine, k2, v2, k1, 10, 20, SkipPessimisticCheck); // Roll back the primary key due to timeout, but the non-pessimistic lock is not // rolled back. must_rollback(&engine, k1, 10, false); // Txn-15 acquires pessimistic locks on k1. must_acquire_pessimistic_lock(&engine, k1, k1, 15, 15); - must_pessimistic_prewrite_put(&engine, k1, v1, k1, 15, 15, true); + must_pessimistic_prewrite_put(&engine, k1, v1, k1, 15, 15, DoPessimisticCheck); // There is a non-pessimistic lock conflict here. - match must_pessimistic_prewrite_put_err(&engine, k2, v2, k1, 15, 15, false) { + match must_pessimistic_prewrite_put_err(&engine, k2, v2, k1, 15, 15, SkipPessimisticCheck) { Error(box ErrorInner::KeyIsLocked(info)) => assert_eq!(info.get_lock_ttl(), 0), e => panic!("unexpected error: {}", e), }; @@ -1166,30 +1166,30 @@ pub(crate) mod tests { // Key not exist; should succeed. fail_to_write_pessimistic_lock(&engine, k, 10, 10); - must_pessimistic_prewrite_put(&engine, k, &v, k, 10, 10, true); + must_pessimistic_prewrite_put(&engine, k, &v, k, 10, 10, DoPessimisticCheck); must_commit(&engine, k, 10, 20); must_get(&engine, k, 20, &v); // for_update_ts(30) >= start_ts(30) > commit_ts(20); should succeed. v.push(0); fail_to_write_pessimistic_lock(&engine, k, 30, 30); - must_pessimistic_prewrite_put(&engine, k, &v, k, 30, 30, true); + must_pessimistic_prewrite_put(&engine, k, &v, k, 30, 30, DoPessimisticCheck); must_commit(&engine, k, 30, 40); must_get(&engine, k, 40, &v); // for_update_ts(40) >= commit_ts(40) > start_ts(35); should fail. fail_to_write_pessimistic_lock(&engine, k, 35, 40); - must_pessimistic_prewrite_put_err(&engine, k, &v, k, 35, 40, true); + must_pessimistic_prewrite_put_err(&engine, k, &v, k, 35, 40, DoPessimisticCheck); // KeyIsLocked; should fail. must_acquire_pessimistic_lock(&engine, k, k, 50, 50); - must_pessimistic_prewrite_put_err(&engine, k, &v, k, 60, 60, true); + must_pessimistic_prewrite_put_err(&engine, k, &v, k, 60, 60, DoPessimisticCheck); pessimistic_rollback::tests::must_success(&engine, k, 50, 50); // The txn has been rolled back; should fail. must_acquire_pessimistic_lock(&engine, k, k, 80, 80); must_cleanup(&engine, k, 80, TimeStamp::max()); - must_pessimistic_prewrite_put_err(&engine, k, &v, k, 80, 80, true); + must_pessimistic_prewrite_put_err(&engine, k, &v, k, 80, 80, DoPessimisticCheck); } #[test] @@ -1219,7 +1219,7 @@ pub(crate) mod tests { ), mutation, &Some(vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); let modifies = txn.into_modifies(); @@ -1277,7 +1277,7 @@ pub(crate) mod tests { ), mutation, &Some(vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]), - true, + DoPessimisticCheck, ) .unwrap(); let modifies = txn.into_modifies(); @@ -1336,7 +1336,7 @@ pub(crate) mod tests { ), mutation, &Some(vec![b"key1".to_vec(), b"key2".to_vec(), b"key3".to_vec()]), - true, + DoPessimisticCheck, ) .unwrap(); assert_eq!(min_commit_ts.into_inner(), 100); @@ -1379,7 +1379,7 @@ pub(crate) mod tests { // Pessimistic transaction also works in the same case. must_acquire_pessimistic_lock(&engine, k, k, 50, 50); must_pessimistic_locked(&engine, k, 50, 50); - must_pessimistic_prewrite_put(&engine, k, v, k, 50, 50, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 50, 50, DoPessimisticCheck); must_commit(&engine, k, 50, 60); must_unlocked(&engine, k); must_written(&engine, k, 50, 60, WriteType::Put); @@ -1562,7 +1562,7 @@ pub(crate) mod tests { // T2, start_ts = 20 must_acquire_pessimistic_lock(&engine, k2, k2, 20, 25); - must_pessimistic_prewrite_put(&engine, k2, v2, k2, 20, 25, true); + must_pessimistic_prewrite_put(&engine, k2, v2, k2, 20, 25, DoPessimisticCheck); must_cleanup(&engine, k2, 20, 0); diff --git a/src/storage/txn/actions/acquire_pessimistic_lock.rs b/src/storage/txn/actions/acquire_pessimistic_lock.rs index 792ed8fcb9a8..9df4d9ebce9e 100644 --- a/src/storage/txn/actions/acquire_pessimistic_lock.rs +++ b/src/storage/txn/actions/acquire_pessimistic_lock.rs @@ -252,6 +252,8 @@ pub fn acquire_pessimistic_lock( pub mod tests { use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + #[cfg(test)] + use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*; use txn_types::TimeStamp; use super::*; @@ -493,7 +495,7 @@ pub mod tests { // Normal must_succeed(&engine, k, k, 1, 1); must_pessimistic_locked(&engine, k, 1, 1); - must_pessimistic_prewrite_put(&engine, k, v, k, 1, 1, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 1, 1, DoPessimisticCheck); must_locked(&engine, k, 1); must_commit(&engine, k, 1, 2); must_unlocked(&engine, k); @@ -516,7 +518,7 @@ pub mod tests { must_prewrite_lock_err(&engine, k, k, 8); must_err(&engine, k, k, 8, 8); must_succeed(&engine, k, k, 8, 9); - must_pessimistic_prewrite_put(&engine, k, v, k, 8, 8, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 8, 8, DoPessimisticCheck); must_commit(&engine, k, 8, 10); must_unlocked(&engine, k); @@ -525,16 +527,16 @@ pub mod tests { must_pessimistic_locked(&engine, k, 11, 11); must_cleanup(&engine, k, 11, 0); must_err(&engine, k, k, 11, 11); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 11, 11, true); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 11, 11, DoPessimisticCheck); must_prewrite_lock_err(&engine, k, k, 11); must_unlocked(&engine, k); must_succeed(&engine, k, k, 12, 12); - must_pessimistic_prewrite_put(&engine, k, v, k, 12, 12, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 12, 12, DoPessimisticCheck); must_locked(&engine, k, 12); must_cleanup(&engine, k, 12, 0); must_err(&engine, k, k, 12, 12); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 12, 12, true); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 12, 12, DoPessimisticCheck); must_prewrite_lock_err(&engine, k, k, 12); must_unlocked(&engine, k); @@ -543,9 +545,9 @@ pub mod tests { must_pessimistic_locked(&engine, k, 13, 13); must_succeed(&engine, k, k, 13, 13); must_pessimistic_locked(&engine, k, 13, 13); - must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, DoPessimisticCheck); must_locked(&engine, k, 13); - must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 13, 13, DoPessimisticCheck); must_locked(&engine, k, 13); must_commit(&engine, k, 13, 14); must_unlocked(&engine, k); @@ -556,7 +558,7 @@ pub mod tests { must_succeed(&engine, k, k, 15, 15); must_pessimistic_locked(&engine, k, 15, 15); must_get(&engine, k, 16, v); - must_pessimistic_prewrite_delete(&engine, k, k, 15, 15, true); + must_pessimistic_prewrite_delete(&engine, k, k, 15, 15, DoPessimisticCheck); must_get_err(&engine, k, 16); must_commit(&engine, k, 15, 17); @@ -582,7 +584,7 @@ pub mod tests { // Acquire lock on a prewritten key should fail. must_succeed(&engine, k, k, 26, 26); must_pessimistic_locked(&engine, k, 26, 26); - must_pessimistic_prewrite_delete(&engine, k, k, 26, 26, true); + must_pessimistic_prewrite_delete(&engine, k, k, 26, 26, DoPessimisticCheck); must_locked(&engine, k, 26); must_err(&engine, k, k, 26, 26); must_locked(&engine, k, 26); @@ -595,7 +597,7 @@ pub mod tests { must_unlocked(&engine, k); must_get_none(&engine, k, 28); // Pessimistic prewrite on a committed key should fail. - must_pessimistic_prewrite_put_err(&engine, k, v, k, 26, 26, true); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 26, 26, DoPessimisticCheck); must_unlocked(&engine, k); must_get_none(&engine, k, 28); // Currently we cannot avoid this. @@ -604,7 +606,7 @@ pub mod tests { must_unlocked(&engine, k); // Non pessimistic key in pessimistic transaction. - must_pessimistic_prewrite_put(&engine, k, v, k, 30, 30, false); + must_pessimistic_prewrite_put(&engine, k, v, k, 30, 30, SkipPessimisticCheck); must_locked(&engine, k, 30); must_commit(&engine, k, 30, 31); must_unlocked(&engine, k); @@ -628,13 +630,13 @@ pub mod tests { must_pessimistic_locked(&engine, k, 35, 37); // Cannot prewrite when there is another transaction's pessimistic lock. - must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 36, true); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, true); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 36, DoPessimisticCheck); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, DoPessimisticCheck); must_pessimistic_locked(&engine, k, 35, 37); // Cannot prewrite when there is another transaction's non-pessimistic lock. - must_pessimistic_prewrite_put(&engine, k, v, k, 35, 37, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 35, 37, DoPessimisticCheck); must_locked(&engine, k, 35); - must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, true); + must_pessimistic_prewrite_put_err(&engine, k, v, k, 36, 38, DoPessimisticCheck); must_locked(&engine, k, 35); // Commit pessimistic transaction's key but with smaller commit_ts than @@ -648,7 +650,7 @@ pub mod tests { // Currently not checked, so prewrite will success. must_succeed(&engine, k, k, 40, 40); must_pessimistic_locked(&engine, k, 40, 40); - must_pessimistic_prewrite_put(&engine, k, v, k, 40, 40, false); + must_pessimistic_prewrite_put(&engine, k, v, k, 40, 40, SkipPessimisticCheck); must_locked(&engine, k, 40); must_commit(&engine, k, 40, 41); must_unlocked(&engine, k); @@ -657,14 +659,14 @@ pub mod tests { // Currently not checked. must_succeed(&engine, k, k, 42, 45); must_pessimistic_locked(&engine, k, 42, 45); - must_pessimistic_prewrite_put(&engine, k, v, k, 42, 43, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 42, 43, DoPessimisticCheck); must_locked(&engine, k, 42); must_commit(&engine, k, 42, 45); must_unlocked(&engine, k); must_succeed(&engine, k, k, 46, 47); must_pessimistic_locked(&engine, k, 46, 47); - must_pessimistic_prewrite_put(&engine, k, v, k, 46, 48, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 46, 48, DoPessimisticCheck); must_locked(&engine, k, 46); must_commit(&engine, k, 46, 50); must_unlocked(&engine, k); @@ -674,7 +676,7 @@ pub mod tests { // Normally non-pessimistic keys in pessimistic transactions are used when we // are sure that there won't be conflicts. So this case is also not checked, and // prewrite will succeeed. - must_pessimistic_prewrite_put(&engine, k, v, k, 47, 48, false); + must_pessimistic_prewrite_put(&engine, k, v, k, 47, 48, SkipPessimisticCheck); must_locked(&engine, k, 47); must_cleanup(&engine, k, 47, 0); must_unlocked(&engine, k); @@ -682,7 +684,7 @@ pub mod tests { // The rollback of the primary key in a pessimistic transaction should be // protected from being collapsed. must_succeed(&engine, k, k, 49, 60); - must_pessimistic_prewrite_put(&engine, k, v, k, 49, 60, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 49, 60, DoPessimisticCheck); must_locked(&engine, k, 49); must_cleanup(&engine, k, 49, 0); must_get_rollback_protected(&engine, k, 49, true); @@ -694,7 +696,7 @@ pub mod tests { // to another write records' commit ts. Now there is a commit record with // commit_ts = 50. must_succeed(&engine, k, k, 50, 61); - must_pessimistic_prewrite_put(&engine, k, v, k, 50, 61, true); + must_pessimistic_prewrite_put(&engine, k, v, k, 50, 61, DoPessimisticCheck); must_locked(&engine, k, 50); must_cleanup(&engine, k, 50, 0); must_get_overlapped_rollback(&engine, k, 50, 46, WriteType::Put, Some(0)); @@ -704,7 +706,15 @@ pub mod tests { let for_update_ts = start_ts + 48; let commit_ts = start_ts + 50; must_succeed(&engine, k, k, *start_ts, for_update_ts); - must_pessimistic_prewrite_put(&engine, k, v, k, *start_ts, for_update_ts, true); + must_pessimistic_prewrite_put( + &engine, + k, + v, + k, + *start_ts, + for_update_ts, + DoPessimisticCheck, + ); must_commit(&engine, k, *start_ts, commit_ts); must_get(&engine, k, commit_ts + 1, v); } @@ -946,13 +956,13 @@ pub mod tests { // Put v1 @ start ts 1, commit ts 2 must_succeed(&engine, k, k, 1, 1); - must_pessimistic_prewrite_put(&engine, k, v1, k, 1, 1, true); + must_pessimistic_prewrite_put(&engine, k, v1, k, 1, 1, DoPessimisticCheck); must_commit(&engine, k, 1, 2); let v2 = b"v2"; // Put v2 @ start ts 10, commit ts 11 must_succeed(&engine, k, k, 10, 10); - must_pessimistic_prewrite_put(&engine, k, v2, k, 10, 10, true); + must_pessimistic_prewrite_put(&engine, k, v2, k, 10, 10, DoPessimisticCheck); must_commit(&engine, k, 10, 11); // Lock @ start ts 9, for update ts 12, commit ts 13 @@ -1079,7 +1089,7 @@ pub mod tests { // T1: start_ts = 3, commit_ts = 5, put key:value must_succeed(&engine, key, key, 3, 3); - must_pessimistic_prewrite_put(&engine, key, value, key, 3, 3, true); + must_pessimistic_prewrite_put(&engine, key, value, key, 3, 3, DoPessimisticCheck); must_commit(&engine, key, 3, 5); // T2: start_ts = 15, acquire pessimistic lock on k, with should_not_exist flag @@ -1114,7 +1124,7 @@ pub mod tests { // T3: start_ts = 8, commit_ts = max_ts + 1 = 16, prewrite a DELETE operation on // k must_succeed(&engine, key, key, 8, 8); - must_pessimistic_prewrite_delete(&engine, key, key, 8, 8, true); + must_pessimistic_prewrite_delete(&engine, key, key, 8, 8, DoPessimisticCheck); must_commit(&engine, key, 8, cm.max_ts().into_inner() + 1); // T1: start_ts = 10, repeatedly acquire pessimistic lock on k, with diff --git a/src/storage/txn/actions/cleanup.rs b/src/storage/txn/actions/cleanup.rs index 461b8e2d4321..19cb90f0a220 100644 --- a/src/storage/txn/actions/cleanup.rs +++ b/src/storage/txn/actions/cleanup.rs @@ -82,6 +82,8 @@ pub mod tests { use concurrency_manager::ConcurrencyManager; use engine_traits::CF_WRITE; use kvproto::kvrpcpb::Context; + #[cfg(test)] + use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*; use txn_types::TimeStamp; use super::*; @@ -233,7 +235,7 @@ pub mod tests { must_get_rollback_protected(&engine, k, ts(11, 1), true); must_acquire_pessimistic_lock(&engine, k, k, ts(13, 1), ts(14, 1)); - must_pessimistic_prewrite_put(&engine, k, v, k, ts(13, 1), ts(14, 1), true); + must_pessimistic_prewrite_put(&engine, k, v, k, ts(13, 1), ts(14, 1), DoPessimisticCheck); must_succeed(&engine, k, ts(13, 1), ts(120, 0)); must_get_rollback_protected(&engine, k, ts(13, 1), true); } diff --git a/src/storage/txn/actions/commit.rs b/src/storage/txn/actions/commit.rs index 456757285e0c..2351e0c32829 100644 --- a/src/storage/txn/actions/commit.rs +++ b/src/storage/txn/actions/commit.rs @@ -107,6 +107,8 @@ pub fn commit( pub mod tests { use concurrency_manager::ConcurrencyManager; use kvproto::kvrpcpb::Context; + #[cfg(test)] + use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*; use txn_types::TimeStamp; use super::*; @@ -275,7 +277,7 @@ pub mod tests { k, &None, ts(60, 0), - true, + DoPessimisticCheck, 50, ts(60, 0), 1, diff --git a/src/storage/txn/actions/prewrite.rs b/src/storage/txn/actions/prewrite.rs index e7ca85c81372..7b562af8b43f 100644 --- a/src/storage/txn/actions/prewrite.rs +++ b/src/storage/txn/actions/prewrite.rs @@ -4,7 +4,10 @@ use std::cmp; use fail::fail_point; -use kvproto::kvrpcpb::{Assertion, AssertionLevel}; +use kvproto::kvrpcpb::{ + Assertion, AssertionLevel, + PrewriteRequestPessimisticAction::{self, *}, +}; use txn_types::{ is_short_value, Key, Mutation, MutationType, OldValue, TimeStamp, Value, Write, WriteType, }; @@ -28,10 +31,10 @@ pub fn prewrite( txn_props: &TransactionProperties<'_>, mutation: Mutation, secondary_keys: &Option>>, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) -> Result<(TimeStamp, OldValue)> { let mut mutation = - PrewriteMutation::from_mutation(mutation, secondary_keys, is_pessimistic_lock, txn_props)?; + PrewriteMutation::from_mutation(mutation, secondary_keys, pessimistic_action, txn_props)?; // Update max_ts for Insert operation to guarantee linearizability and snapshot // isolation @@ -56,8 +59,8 @@ pub fn prewrite( let mut lock_amended = false; let lock_status = match reader.load_lock(&mutation.key)? { - Some(lock) => mutation.check_lock(lock, is_pessimistic_lock)?, - None if is_pessimistic_lock => { + Some(lock) => mutation.check_lock(lock, pessimistic_action)?, + None if matches!(pessimistic_action, DoPessimisticCheck) => { amend_pessimistic_lock(&mutation, reader)?; lock_amended = true; LockStatus::None @@ -228,7 +231,7 @@ struct PrewriteMutation<'a> { mutation_type: MutationType, secondary_keys: &'a Option>>, min_commit_ts: TimeStamp, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, lock_type: Option, lock_ttl: u64, @@ -243,7 +246,7 @@ impl<'a> PrewriteMutation<'a> { fn from_mutation( mutation: Mutation, secondary_keys: &'a Option>>, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, txn_props: &'a TransactionProperties<'a>, ) -> Result> { let should_not_write = mutation.should_not_write(); @@ -265,7 +268,7 @@ impl<'a> PrewriteMutation<'a> { mutation_type, secondary_keys, min_commit_ts: txn_props.min_commit_ts, - is_pessimistic_lock, + pessimistic_action, lock_type, lock_ttl: txn_props.lock_ttl, @@ -291,11 +294,15 @@ impl<'a> PrewriteMutation<'a> { } /// Check whether the current key is locked at any timestamp. - fn check_lock(&mut self, lock: Lock, is_pessimistic_lock: bool) -> Result { + fn check_lock( + &mut self, + lock: Lock, + pessimistic_action: PrewriteRequestPessimisticAction, + ) -> Result { if lock.ts != self.txn_props.start_ts { // Abort on lock belonging to other transaction if // prewrites a pessimistic lock. - if is_pessimistic_lock { + if matches!(pessimistic_action, DoPessimisticCheck) { warn!( "prewrite failed (pessimistic lock not found)"; "start_ts" => self.txn_props.start_ts, @@ -360,7 +367,12 @@ impl<'a> PrewriteMutation<'a> { // Note: PessimisticLockNotFound can happen on a non-pessimistically locked key, // if it is a retrying prewrite request. TransactionKind::Pessimistic(for_update_ts) => { - if commit_ts > for_update_ts { + if let DoConstraintCheck = self.pessimistic_action { + if commit_ts > self.txn_props.start_ts { + MVCC_CONFLICT_COUNTER.prewrite_write_conflict.inc(); + self.write_conflict_error(&write, commit_ts)?; + } + } else if commit_ts > for_update_ts { warn!("conflicting write was found, pessimistic lock must be lost for the corresponding row key"; "key" => %self.key, "start_ts" => self.txn_props.start_ts, @@ -570,10 +582,16 @@ impl<'a> PrewriteMutation<'a> { match &self.txn_props.kind { TransactionKind::Optimistic(s) => *s, TransactionKind::Pessimistic(_) => { - // For non-pessimistic-locked keys, do not skip constraint check when retrying. - // This intents to protect idempotency. - // Ref: https://github.com/tikv/tikv/issues/11187 - self.is_pessimistic_lock || !self.txn_props.is_retry_request + match self.pessimistic_action { + DoPessimisticCheck => true, + // For non-pessimistic-locked keys, do not skip constraint check when retrying. + // This intents to protect idempotency. + // Ref: https://github.com/tikv/tikv/issues/11187 + SkipPessimisticCheck => !self.txn_props.is_retry_request, + // For keys that postpones constraint check to prewrite, do not skip constraint + // check. + PrewriteRequestPessimisticAction::DoConstraintCheck => false, + } } } } @@ -782,7 +800,7 @@ pub mod tests { &props, Mutation::make_insert(Key::from_raw(key), value.to_vec()), &None, - false, + SkipPessimisticCheck, )?; // Insert must be None if the key is not lock, or be Unspecified if the // key is already locked. @@ -813,7 +831,7 @@ pub mod tests { &optimistic_txn_props(pk, ts), Mutation::make_check_not_exists(Key::from_raw(key)), &None, - true, + DoPessimisticCheck, )?; assert_eq!(old_value, OldValue::Unspecified); Ok(()) @@ -835,7 +853,7 @@ pub mod tests { &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), &Some(vec![b"k2".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!(old_value, OldValue::None); @@ -848,7 +866,7 @@ pub mod tests { &optimistic_async_props(b"k1", 10.into(), 50.into(), 1, false), Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()), &Some(vec![]), - false, + SkipPessimisticCheck, ) .unwrap_err(); assert!(matches!( @@ -883,7 +901,7 @@ pub mod tests { &props, Mutation::make_check_not_exists(Key::from_raw(b"k0")), &Some(vec![]), - false, + SkipPessimisticCheck, ) .unwrap(); assert!(min_ts > props.start_ts); @@ -903,7 +921,7 @@ pub mod tests { &props, Mutation::make_check_not_exists(Key::from_raw(b"k0")), &Some(vec![]), - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!(cm.max_ts(), props.start_ts); @@ -918,7 +936,7 @@ pub mod tests { &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, false), Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), &Some(vec![b"k2".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); assert!(min_ts > 42.into()); @@ -941,7 +959,7 @@ pub mod tests { &optimistic_async_props(b"k3", 44.into(), 50.into(), 2, false), mutation.clone(), &Some(vec![b"k4".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); assert!(min_ts > 44.into()); @@ -963,7 +981,7 @@ pub mod tests { &props, mutation.clone(), &Some(vec![b"k6".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); assert!(min_ts > 45.into()); @@ -982,7 +1000,7 @@ pub mod tests { &props, mutation.clone(), &Some(vec![b"k8".to_vec()]), - false, + SkipPessimisticCheck, ) .unwrap(); assert!(min_ts >= 46.into()); @@ -1012,7 +1030,7 @@ pub mod tests { &optimistic_async_props(b"k1", 10.into(), 50.into(), 2, true), Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!(old_value, OldValue::None); @@ -1025,7 +1043,7 @@ pub mod tests { &optimistic_async_props(b"k1", 10.into(), 50.into(), 1, true), Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap_err(); assert!(matches!( @@ -1071,7 +1089,7 @@ pub mod tests { }, Mutation::make_check_not_exists(Key::from_raw(key)), &None, - false, + SkipPessimisticCheck, )?; assert_eq!(old_value, OldValue::Unspecified); Ok(()) @@ -1108,7 +1126,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), &Some(vec![b"k2".to_vec()]), - true, + DoPessimisticCheck, ) .unwrap(); // Pessimistic txn skips constraint check, does not read previous write. @@ -1122,7 +1140,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()), &Some(vec![]), - true, + DoPessimisticCheck, ) .unwrap_err(); } @@ -1158,7 +1176,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), &None, - true, + DoPessimisticCheck, ) .unwrap(); // Pessimistic txn skips constraint check, does not read previous write. @@ -1172,7 +1190,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()), &None, - true, + DoPessimisticCheck, ) .unwrap_err(); } @@ -1278,7 +1296,7 @@ pub mod tests { &txn_props, Mutation::make_check_not_exists(Key::from_raw(key)), &None, - false, + SkipPessimisticCheck, ); if success { let res = res.unwrap(); @@ -1293,7 +1311,7 @@ pub mod tests { &txn_props, Mutation::make_insert(Key::from_raw(key), b"value".to_vec()), &None, - false, + SkipPessimisticCheck, ); if success { let res = res.unwrap(); @@ -1348,7 +1366,7 @@ pub mod tests { &txn_props, Mutation::make_put(key.clone(), b"value".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!(&old_value, expected_value, "key: {}", key); @@ -1368,7 +1386,7 @@ pub mod tests { &Some(vec![b"k2".to_vec()]), 10, 10, - true, + DoPessimisticCheck, 15, ); must_pessimistic_prewrite_put_async_commit( @@ -1379,7 +1397,7 @@ pub mod tests { &Some(vec![]), 10, 10, - false, + SkipPessimisticCheck, 15, ); @@ -1398,7 +1416,7 @@ pub mod tests { &Some(vec![]), 10, 10, - false, + SkipPessimisticCheck, 0, ); assert!(matches!( @@ -1429,7 +1447,7 @@ pub mod tests { &Some(vec![]), 10, 10, - false, + SkipPessimisticCheck, 0, ); assert!(matches!( @@ -1439,7 +1457,15 @@ pub mod tests { must_unlocked(&engine, b"k2"); let err = must_retry_pessimistic_prewrite_put_err( - &engine, b"k2", b"v2", b"k1", &None, 10, 10, false, 0, + &engine, + b"k2", + b"v2", + b"k1", + &None, + 10, + 10, + SkipPessimisticCheck, + 0, ); assert!(matches!( err, @@ -1451,7 +1477,15 @@ pub mod tests { // Try a different txn start ts (which haven't been successfully committed // before). let err = must_retry_pessimistic_prewrite_put_err( - &engine, b"k2", b"v2", b"k1", &None, 11, 11, false, 0, + &engine, + b"k2", + b"v2", + b"k1", + &None, + 11, + 11, + SkipPessimisticCheck, + 0, ); assert!(matches!( err, @@ -1467,7 +1501,7 @@ pub mod tests { b"k1", &None, 12.into(), - false, + SkipPessimisticCheck, 100, 12.into(), 1, @@ -1490,7 +1524,7 @@ pub mod tests { b"k1", &None, 13.into(), - false, + SkipPessimisticCheck, 100, 55.into(), 1, @@ -1545,7 +1579,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(b"k1"), b"value".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!( @@ -1599,7 +1633,7 @@ pub mod tests { &txn_props, Mutation::make_insert(Key::from_raw(b"k1"), b"v2".to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); assert_eq!(old_value, OldValue::None); @@ -1736,7 +1770,7 @@ pub mod tests { &txn_props, Mutation::make_put(Key::from_raw(key), b"v2".to_vec()), &None, - false, + SkipPessimisticCheck, )?; Ok(old_value) })], @@ -1772,7 +1806,7 @@ pub mod tests { &txn_props, Mutation::make_insert(Key::from_raw(key), b"v2".to_vec()), &None, - false, + SkipPessimisticCheck, )?; Ok(old_value) })], @@ -1786,7 +1820,7 @@ pub mod tests { let prewrite_put = |key: &'_ _, value, ts: u64, - is_pessimistic_lock, + pessimistic_action, for_update_ts: u64, assertion, assertion_level, @@ -1799,7 +1833,7 @@ pub mod tests { key, &None, ts.into(), - is_pessimistic_lock, + pessimistic_action, 100, for_update_ts.into(), 1, @@ -1818,7 +1852,7 @@ pub mod tests { &None, ts, for_update_ts, - is_pessimistic_lock, + pessimistic_action, 0, false, assertion, @@ -1843,7 +1877,7 @@ pub mod tests { &k1, b"v1", 10, - false, + SkipPessimisticCheck, 0, Assertion::NotExist, assertion_level, @@ -1855,7 +1889,7 @@ pub mod tests { &k1, b"v1", 20, - false, + SkipPessimisticCheck, 0, Assertion::Exist, assertion_level, @@ -1868,7 +1902,7 @@ pub mod tests { &k2, b"v2", 10, - true, + DoPessimisticCheck, 11, Assertion::NotExist, assertion_level, @@ -1880,7 +1914,7 @@ pub mod tests { &k2, b"v2", 20, - true, + DoPessimisticCheck, 21, Assertion::Exist, assertion_level, @@ -1894,7 +1928,7 @@ pub mod tests { &k1, b"v1", 30, - false, + SkipPessimisticCheck, 0, Assertion::NotExist, assertion_level, @@ -1904,7 +1938,7 @@ pub mod tests { &k3, b"v3", 30, - false, + SkipPessimisticCheck, 0, Assertion::Exist, assertion_level, @@ -1920,7 +1954,7 @@ pub mod tests { &k2, b"v2", 30, - true, + DoPessimisticCheck, 31, Assertion::NotExist, assertion_level, @@ -1930,7 +1964,7 @@ pub mod tests { &k4, b"v4", 30, - true, + DoPessimisticCheck, 31, Assertion::Exist, assertion_level, @@ -1939,14 +1973,14 @@ pub mod tests { must_rollback(&engine, &k2, 30, true); must_rollback(&engine, &k4, 30, true); - // Pessimistic transaction fail on strict level no matter whether - // `is_pessimistic_lock`. + // Pessimistic transaction fail on strict level no matter what + // `pessimistic_action` is. let pass = assertion_level != AssertionLevel::Strict; prewrite_put( &k1, b"v1", 40, - false, + SkipPessimisticCheck, 41, Assertion::NotExist, assertion_level, @@ -1956,7 +1990,7 @@ pub mod tests { &k3, b"v3", 40, - false, + SkipPessimisticCheck, 41, Assertion::Exist, assertion_level, @@ -1971,7 +2005,7 @@ pub mod tests { &k2, b"v2", 40, - true, + DoPessimisticCheck, 41, Assertion::NotExist, assertion_level, @@ -1981,7 +2015,7 @@ pub mod tests { &k4, b"v4", 40, - true, + DoPessimisticCheck, 41, Assertion::Exist, assertion_level, @@ -2027,4 +2061,39 @@ pub mod tests { test_all_levels(&prepare_delete); test_all_levels(&prepare_gc_fence); } + + #[test] + fn test_deferred_constraint_check() { + let engine = crate::storage::TestEngineBuilder::new().build().unwrap(); + let key = b"key"; + let key2 = b"key2"; + let value = b"value"; + + // 1. write conflict + must_prewrite_put(&engine, key, value, key, 1); + must_commit(&engine, key, 1, 5); + must_pessimistic_prewrite_insert(&engine, key2, value, key, 3, 3, SkipPessimisticCheck); + let err = + must_pessimistic_prewrite_insert_err(&engine, key, value, key, 3, 3, DoConstraintCheck); + assert!(matches!(err, Error(box ErrorInner::WriteConflict { .. }))); + + // 2. unique constraint fail + must_prewrite_put(&engine, key, value, key, 11); + must_commit(&engine, key, 11, 12); + let err = must_pessimistic_prewrite_insert_err( + &engine, + key, + value, + key, + 13, + 13, + DoConstraintCheck, + ); + assert!(matches!(err, Error(box ErrorInner::AlreadyExist { .. }))); + + // 3. success + must_prewrite_delete(&engine, key, key, 21); + must_commit(&engine, key, 21, 22); + must_pessimistic_prewrite_insert(&engine, key, value, key, 23, 23, DoConstraintCheck); + } } diff --git a/src/storage/txn/actions/tests.rs b/src/storage/txn/actions/tests.rs index e5e4b57054c4..523d4b9e8ac6 100644 --- a/src/storage/txn/actions/tests.rs +++ b/src/storage/txn/actions/tests.rs @@ -3,7 +3,10 @@ //! This file contains tests and testing tools which affects multiple actions use concurrency_manager::ConcurrencyManager; -use kvproto::kvrpcpb::{Assertion, AssertionLevel, Context}; +use kvproto::kvrpcpb::{ + Assertion, AssertionLevel, Context, + PrewriteRequestPessimisticAction::{self, *}, +}; use prewrite::{prewrite, CommitKind, TransactionKind, TransactionProperties}; use super::*; @@ -20,7 +23,7 @@ pub fn must_prewrite_put_impl( pk: &[u8], secondary_keys: &Option>>, ts: TimeStamp, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, lock_ttl: u64, for_update_ts: TimeStamp, txn_size: u64, @@ -29,6 +32,81 @@ pub fn must_prewrite_put_impl( is_retry_request: bool, assertion: Assertion, assertion_level: AssertionLevel, +) { + must_prewrite_put_impl_with_should_not_exist( + engine, + key, + value, + pk, + secondary_keys, + ts, + pessimistic_action, + lock_ttl, + for_update_ts, + txn_size, + min_commit_ts, + max_commit_ts, + is_retry_request, + assertion, + assertion_level, + false, + ); +} + +pub fn must_prewrite_insert_impl( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + secondary_keys: &Option>>, + ts: TimeStamp, + pessimistic_action: PrewriteRequestPessimisticAction, + lock_ttl: u64, + for_update_ts: TimeStamp, + txn_size: u64, + min_commit_ts: TimeStamp, + max_commit_ts: TimeStamp, + is_retry_request: bool, + assertion: Assertion, + assertion_level: AssertionLevel, +) { + must_prewrite_put_impl_with_should_not_exist( + engine, + key, + value, + pk, + secondary_keys, + ts, + pessimistic_action, + lock_ttl, + for_update_ts, + txn_size, + min_commit_ts, + max_commit_ts, + is_retry_request, + assertion, + assertion_level, + true, + ); +} + +pub fn must_prewrite_put_impl_with_should_not_exist( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + secondary_keys: &Option>>, + ts: TimeStamp, + pessimistic_action: PrewriteRequestPessimisticAction, + lock_ttl: u64, + for_update_ts: TimeStamp, + txn_size: u64, + min_commit_ts: TimeStamp, + max_commit_ts: TimeStamp, + is_retry_request: bool, + assertion: Assertion, + assertion_level: AssertionLevel, + should_not_exist: bool, ) { let ctx = Context::default(); let snapshot = engine.snapshot(Default::default()).unwrap(); @@ -36,7 +114,11 @@ pub fn must_prewrite_put_impl( let mut txn = MvccTxn::new(ts, cm); let mut reader = SnapshotReader::new(ts, snapshot, true); - let mutation = Mutation::Put((Key::from_raw(key), value.to_vec()), assertion); + let mutation = if should_not_exist { + Mutation::Insert((Key::from_raw(key), value.to_vec()), assertion) + } else { + Mutation::Put((Key::from_raw(key), value.to_vec()), assertion) + }; let txn_kind = if for_update_ts.is_zero() { TransactionKind::Optimistic(false) } else { @@ -64,7 +146,7 @@ pub fn must_prewrite_put_impl( }, mutation, secondary_keys, - is_pessimistic_lock, + pessimistic_action, ) .unwrap(); write(engine, &ctx, txn.into_modifies()); @@ -84,7 +166,7 @@ pub fn must_prewrite_put( pk, &None, ts.into(), - false, + SkipPessimisticCheck, 0, TimeStamp::default(), 0, @@ -103,7 +185,7 @@ pub fn must_pessimistic_prewrite_put( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) { must_prewrite_put_impl( engine, @@ -112,7 +194,35 @@ pub fn must_pessimistic_prewrite_put( pk, &None, ts.into(), - is_pessimistic_lock, + pessimistic_action, + 0, + for_update_ts.into(), + 0, + TimeStamp::default(), + TimeStamp::default(), + false, + Assertion::None, + AssertionLevel::Off, + ); +} + +pub fn must_pessimistic_prewrite_insert( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + ts: impl Into, + for_update_ts: impl Into, + pessimistic_action: PrewriteRequestPessimisticAction, +) { + must_prewrite_insert_impl( + engine, + key, + value, + pk, + &None, + ts.into(), + pessimistic_action, 0, for_update_ts.into(), 0, @@ -131,7 +241,7 @@ pub fn must_pessimistic_prewrite_put_with_ttl( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, lock_ttl: u64, ) { must_prewrite_put_impl( @@ -141,7 +251,7 @@ pub fn must_pessimistic_prewrite_put_with_ttl( pk, &None, ts.into(), - is_pessimistic_lock, + pessimistic_action, lock_ttl, for_update_ts.into(), 0, @@ -166,6 +276,11 @@ pub fn must_prewrite_put_for_large_txn( let ts = ts.into(); let min_commit_ts = (ts.into_inner() + 1).into(); let for_update_ts = for_update_ts.into(); + let pessimistic_action = if !for_update_ts.is_zero() { + DoPessimisticCheck + } else { + SkipPessimisticCheck + }; must_prewrite_put_impl( engine, key, @@ -173,7 +288,7 @@ pub fn must_prewrite_put_for_large_txn( pk, &None, ts, - !for_update_ts.is_zero(), + pessimistic_action, lock_ttl, for_update_ts, 0, @@ -202,7 +317,7 @@ pub fn must_prewrite_put_async_commit( pk, secondary_keys, ts.into(), - false, + SkipPessimisticCheck, 100, TimeStamp::default(), 0, @@ -222,7 +337,7 @@ pub fn must_pessimistic_prewrite_put_async_commit( secondary_keys: &Option>>, ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, min_commit_ts: impl Into, ) { assert!(secondary_keys.is_some()); @@ -233,7 +348,7 @@ pub fn must_pessimistic_prewrite_put_async_commit( pk, secondary_keys, ts.into(), - is_pessimistic_lock, + pessimistic_action, 100, for_update_ts.into(), 0, @@ -269,6 +384,7 @@ fn default_txn_props( assertion_level: AssertionLevel::Off, } } + pub fn must_prewrite_put_err_impl( engine: &E, key: &[u8], @@ -277,11 +393,74 @@ pub fn must_prewrite_put_err_impl( secondary_keys: &Option>>, ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, + max_commit_ts: impl Into, + is_retry_request: bool, + assertion: Assertion, + assertion_level: AssertionLevel, +) -> Error { + must_prewrite_put_err_impl_with_should_not_exist( + engine, + key, + value, + pk, + secondary_keys, + ts.into(), + for_update_ts.into(), + pessimistic_action, + max_commit_ts.into(), + is_retry_request, + assertion, + assertion_level, + false, + ) +} + +pub fn must_prewrite_insert_err_impl( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + secondary_keys: &Option>>, + ts: impl Into, + for_update_ts: impl Into, + pessimistic_action: PrewriteRequestPessimisticAction, + max_commit_ts: impl Into, + is_retry_request: bool, + assertion: Assertion, + assertion_level: AssertionLevel, +) -> Error { + must_prewrite_put_err_impl_with_should_not_exist( + engine, + key, + value, + pk, + secondary_keys, + ts.into(), + for_update_ts.into(), + pessimistic_action, + max_commit_ts.into(), + is_retry_request, + assertion, + assertion_level, + true, + ) +} + +pub fn must_prewrite_put_err_impl_with_should_not_exist( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + secondary_keys: &Option>>, + ts: impl Into, + for_update_ts: impl Into, + pessimistic_action: PrewriteRequestPessimisticAction, max_commit_ts: impl Into, is_retry_request: bool, assertion: Assertion, assertion_level: AssertionLevel, + should_not_exist: bool, ) -> Error { let snapshot = engine.snapshot(Default::default()).unwrap(); let for_update_ts = for_update_ts.into(); @@ -289,7 +468,11 @@ pub fn must_prewrite_put_err_impl( let ts = ts.into(); let mut txn = MvccTxn::new(ts, cm); let mut reader = SnapshotReader::new(ts, snapshot, true); - let mutation = Mutation::Put((Key::from_raw(key), value.to_vec()), assertion); + let mutation = if should_not_exist { + Mutation::Insert((Key::from_raw(key), value.to_vec()), assertion) + } else { + Mutation::Put((Key::from_raw(key), value.to_vec()), assertion) + }; let commit_kind = if secondary_keys.is_some() { CommitKind::Async(max_commit_ts.into()) } else { @@ -306,7 +489,7 @@ pub fn must_prewrite_put_err_impl( &props, mutation, &None, - is_pessimistic_lock, + pessimistic_action, ) .unwrap_err() } @@ -326,7 +509,7 @@ pub fn must_prewrite_put_err( &None, ts, TimeStamp::zero(), - false, + SkipPessimisticCheck, 0, false, Assertion::None, @@ -341,7 +524,7 @@ pub fn must_pessimistic_prewrite_put_err( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) -> Error { must_prewrite_put_err_impl( engine, @@ -351,7 +534,32 @@ pub fn must_pessimistic_prewrite_put_err( &None, ts, for_update_ts, - is_pessimistic_lock, + pessimistic_action, + 0, + false, + Assertion::None, + AssertionLevel::Off, + ) +} + +pub fn must_pessimistic_prewrite_insert_err( + engine: &E, + key: &[u8], + value: &[u8], + pk: &[u8], + ts: impl Into, + for_update_ts: impl Into, + pessimistic_action: PrewriteRequestPessimisticAction, +) -> Error { + must_prewrite_insert_err_impl( + engine, + key, + value, + pk, + &None, + ts, + for_update_ts, + pessimistic_action, 0, false, Assertion::None, @@ -367,7 +575,7 @@ pub fn must_retry_pessimistic_prewrite_put_err( secondary_keys: &Option>>, ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, max_commit_ts: impl Into, ) -> Error { must_prewrite_put_err_impl( @@ -378,7 +586,7 @@ pub fn must_retry_pessimistic_prewrite_put_err( secondary_keys, ts, for_update_ts, - is_pessimistic_lock, + pessimistic_action, max_commit_ts, true, Assertion::None, @@ -392,7 +600,7 @@ fn must_prewrite_delete_impl( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) { let ctx = Context::default(); let snapshot = engine.snapshot(Default::default()).unwrap(); @@ -409,7 +617,7 @@ fn must_prewrite_delete_impl( &default_txn_props(ts, pk, for_update_ts), mutation, &None, - is_pessimistic_lock, + pessimistic_action, ) .unwrap(); @@ -424,7 +632,7 @@ pub fn must_prewrite_delete( pk: &[u8], ts: impl Into, ) { - must_prewrite_delete_impl(engine, key, pk, ts, TimeStamp::zero(), false); + must_prewrite_delete_impl(engine, key, pk, ts, TimeStamp::zero(), SkipPessimisticCheck); } pub fn must_pessimistic_prewrite_delete( @@ -433,9 +641,9 @@ pub fn must_pessimistic_prewrite_delete( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) { - must_prewrite_delete_impl(engine, key, pk, ts, for_update_ts, is_pessimistic_lock); + must_prewrite_delete_impl(engine, key, pk, ts, for_update_ts, pessimistic_action); } fn must_prewrite_lock_impl( @@ -444,7 +652,7 @@ fn must_prewrite_lock_impl( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) { let ctx = Context::default(); let snapshot = engine.snapshot(Default::default()).unwrap(); @@ -461,7 +669,7 @@ fn must_prewrite_lock_impl( &default_txn_props(ts, pk, for_update_ts), mutation, &None, - is_pessimistic_lock, + pessimistic_action, ) .unwrap(); @@ -471,7 +679,7 @@ fn must_prewrite_lock_impl( } pub fn must_prewrite_lock(engine: &E, key: &[u8], pk: &[u8], ts: impl Into) { - must_prewrite_lock_impl(engine, key, pk, ts, TimeStamp::zero(), false); + must_prewrite_lock_impl(engine, key, pk, ts, TimeStamp::zero(), SkipPessimisticCheck); } pub fn must_prewrite_lock_err( @@ -492,7 +700,7 @@ pub fn must_prewrite_lock_err( &default_txn_props(ts, pk, TimeStamp::zero()), Mutation::make_lock(Key::from_raw(key)), &None, - false, + SkipPessimisticCheck, ) .unwrap_err(); } @@ -503,9 +711,9 @@ pub fn must_pessimistic_prewrite_lock( pk: &[u8], ts: impl Into, for_update_ts: impl Into, - is_pessimistic_lock: bool, + pessimistic_action: PrewriteRequestPessimisticAction, ) { - must_prewrite_lock_impl(engine, key, pk, ts, for_update_ts, is_pessimistic_lock); + must_prewrite_lock_impl(engine, key, pk, ts, for_update_ts, pessimistic_action); } pub fn must_rollback( diff --git a/src/storage/txn/commands/check_txn_status.rs b/src/storage/txn/commands/check_txn_status.rs index 7fd4a45ff8ab..24f69e9a2373 100644 --- a/src/storage/txn/commands/check_txn_status.rs +++ b/src/storage/txn/commands/check_txn_status.rs @@ -147,7 +147,7 @@ impl WriteCommand for CheckTxnStatus { #[cfg(test)] pub mod tests { use concurrency_manager::ConcurrencyManager; - use kvproto::kvrpcpb::Context; + use kvproto::kvrpcpb::{Context, PrewriteRequestPessimisticAction::*}; use tikv_util::deadline::Deadline; use txn_types::{Key, WriteType}; @@ -388,7 +388,7 @@ pub mod tests { &Some(vec![]), 15, 16, - true, + DoPessimisticCheck, 17, ); // All following check_txn_status should return the unchanged lock information @@ -491,7 +491,7 @@ pub mod tests { &Some(vec![]), 20, 25, - true, + DoPessimisticCheck, 28, ); // the client must call check_txn_status with caller_start_ts == current_ts == @@ -520,7 +520,7 @@ pub mod tests { &Some(vec![]), 30, 35, - true, + DoPessimisticCheck, 36, ); // the client must call check_txn_status with caller_start_ts == current_ts == @@ -791,7 +791,7 @@ pub mod tests { must_large_txn_locked(&engine, k, ts(4, 0), 200, ts(135, 1), true); // Commit the key. - must_pessimistic_prewrite_put(&engine, k, v, k, ts(4, 0), ts(130, 0), true); + must_pessimistic_prewrite_put(&engine, k, v, k, ts(4, 0), ts(130, 0), DoPessimisticCheck); must_commit(&engine, k, ts(4, 0), ts(140, 0)); must_unlocked(&engine, k); must_get_commit_ts(&engine, k, ts(4, 0), ts(140, 0)); @@ -940,7 +940,7 @@ pub mod tests { k, &None, ts(300, 0), - false, + SkipPessimisticCheck, 100, TimeStamp::zero(), 1, @@ -1069,7 +1069,7 @@ pub mod tests { k, &None, ts(30, 0), - false, + SkipPessimisticCheck, 10, TimeStamp::zero(), 1, diff --git a/src/storage/txn/commands/mod.rs b/src/storage/txn/commands/mod.rs index 7f748c352f7b..3dc1a37697e9 100644 --- a/src/storage/txn/commands/mod.rs +++ b/src/storage/txn/commands/mod.rs @@ -162,12 +162,12 @@ impl From for TypedCommand { req.take_context(), ) } else { - let is_pessimistic_lock = req.take_is_pessimistic_lock(); + let pessimistic_actions = req.take_pessimistic_actions(); let mutations = req .take_mutations() .into_iter() .map(Into::into) - .zip(is_pessimistic_lock.into_iter()) + .zip(pessimistic_actions) .collect(); PrewritePessimistic::new( mutations, @@ -803,7 +803,7 @@ pub mod test_util { pub fn pessimistic_prewrite( engine: &E, statistics: &mut Statistics, - mutations: Vec<(Mutation, bool)>, + mutations: Vec<(Mutation, PrewriteRequestPessimisticAction)>, primary: Vec, start_ts: u64, for_update_ts: u64, @@ -826,7 +826,7 @@ pub mod test_util { engine: &E, cm: ConcurrencyManager, statistics: &mut Statistics, - mutations: Vec<(Mutation, bool)>, + mutations: Vec<(Mutation, PrewriteRequestPessimisticAction)>, primary: Vec, start_ts: u64, for_update_ts: u64, diff --git a/src/storage/txn/commands/prewrite.rs b/src/storage/txn/commands/prewrite.rs index a6aa8af6f873..deca5733eb0f 100644 --- a/src/storage/txn/commands/prewrite.rs +++ b/src/storage/txn/commands/prewrite.rs @@ -9,7 +9,10 @@ use std::mem; use engine_traits::CF_WRITE; -use kvproto::kvrpcpb::{AssertionLevel, ExtraOp}; +use kvproto::kvrpcpb::{ + AssertionLevel, ExtraOp, + PrewriteRequestPessimisticAction::{self, *}, +}; use tikv_kv::SnapshotExt; use txn_types::{Key, Mutation, OldValue, OldValues, TimeStamp, TxnExtra, Write, WriteType}; @@ -254,7 +257,7 @@ command! { cmd_ty => PrewriteResult, content => { /// The set of mutations to apply; the bool = is pessimistic lock. - mutations: Vec<(Mutation, bool)>, + mutations: Vec<(Mutation, PrewriteRequestPessimisticAction)>, /// The primary lock. Secondary locks (from `mutations`) will refer to the primary lock. primary: Vec, /// The transaction timestamp. @@ -308,7 +311,7 @@ impl std::fmt::Debug for PrewritePessimistic { impl PrewritePessimistic { #[cfg(test)] pub fn with_defaults( - mutations: Vec<(Mutation, bool)>, + mutations: Vec<(Mutation, PrewriteRequestPessimisticAction)>, primary: Vec, start_ts: TimeStamp, for_update_ts: TimeStamp, @@ -331,7 +334,7 @@ impl PrewritePessimistic { #[cfg(test)] pub fn with_1pc( - mutations: Vec<(Mutation, bool)>, + mutations: Vec<(Mutation, PrewriteRequestPessimisticAction)>, primary: Vec, start_ts: TimeStamp, for_update_ts: TimeStamp, @@ -549,7 +552,7 @@ impl Prewriter { let mut assertion_failure = None; for m in mem::take(&mut self.mutations) { - let is_pessimistic_lock = m.is_pessimistic_lock(); + let pessimistic_action = m.pessimistic_action(); let m = m.into_mutation(); let key = m.key().clone(); let mutation_type = m.mutation_type(); @@ -560,8 +563,7 @@ impl Prewriter { } let need_min_commit_ts = secondaries.is_some() || self.try_one_pc; - let prewrite_result = - prewrite(txn, reader, &props, m, secondaries, is_pessimistic_lock); + let prewrite_result = prewrite(txn, reader, &props, m, secondaries, pessimistic_action); match prewrite_result { Ok((ts, old_value)) if !(need_min_commit_ts && ts.is_zero()) => { if need_min_commit_ts && final_min_commit_ts < ts { @@ -781,7 +783,7 @@ struct Pessimistic { } impl PrewriteKind for Pessimistic { - type Mutation = (Mutation, bool); + type Mutation = (Mutation, PrewriteRequestPessimisticAction); fn txn_kind(&self) -> TransactionKind { TransactionKind::Pessimistic(self.for_update_ts) @@ -791,16 +793,17 @@ impl PrewriteKind for Pessimistic { /// The type of mutation and, optionally, its extra information, differing for /// the optimistic and pessimistic transaction. /// For optimistic txns, this is `Mutation`. -/// For pessimistic txns, this is `(Mutation, bool)`, where the bool indicates -/// whether the mutation takes a pessimistic lock or not. +/// For pessimistic txns, this is `(Mutation, PessimisticAction)`, where the +/// action indicates what kind of operations(checks) need to be performed. +/// The action also implies the type of the lock status. trait MutationLock { - fn is_pessimistic_lock(&self) -> bool; + fn pessimistic_action(&self) -> PrewriteRequestPessimisticAction; fn into_mutation(self) -> Mutation; } impl MutationLock for Mutation { - fn is_pessimistic_lock(&self) -> bool { - false + fn pessimistic_action(&self) -> PrewriteRequestPessimisticAction { + SkipPessimisticCheck } fn into_mutation(self) -> Mutation { @@ -808,8 +811,8 @@ impl MutationLock for Mutation { } } -impl MutationLock for (Mutation, bool) { - fn is_pessimistic_lock(&self) -> bool { +impl MutationLock for (Mutation, PrewriteRequestPessimisticAction) { + fn pessimistic_action(&self) -> PrewriteRequestPessimisticAction { self.1 } @@ -1185,7 +1188,10 @@ mod tests { must_acquire_pessimistic_lock(&engine, key, key, 10, 10); - let mutations = vec![(Mutation::make_put(Key::from_raw(key), value.to_vec()), true)]; + let mutations = vec![( + Mutation::make_put(Key::from_raw(key), value.to_vec()), + DoPessimisticCheck, + )]; let mut statistics = Statistics::default(); pessimistic_prewrite_with_cm( &engine, @@ -1209,8 +1215,14 @@ mod tests { must_acquire_pessimistic_lock(&engine, k1, k1, 8, 12); let mutations = vec![ - (Mutation::make_put(Key::from_raw(k1), v1.to_vec()), true), - (Mutation::make_put(Key::from_raw(k2), v2.to_vec()), false), + ( + Mutation::make_put(Key::from_raw(k1), v1.to_vec()), + DoPessimisticCheck, + ), + ( + Mutation::make_put(Key::from_raw(k2), v2.to_vec()), + SkipPessimisticCheck, + ), ]; statistics = Statistics::default(); pessimistic_prewrite_with_cm( @@ -1235,7 +1247,10 @@ mod tests { cm.update_max_ts(50.into()); must_acquire_pessimistic_lock(&engine, k1, k1, 20, 20); - let mutations = vec![(Mutation::make_put(Key::from_raw(k1), v1.to_vec()), true)]; + let mutations = vec![( + Mutation::make_put(Key::from_raw(k1), v1.to_vec()), + DoPessimisticCheck, + )]; statistics = Statistics::default(); let res = pessimistic_prewrite_with_cm( &engine, @@ -1272,8 +1287,14 @@ mod tests { .unwrap(); // Try 1PC on the two keys and it will fail on the second one. let mutations = vec![ - (Mutation::make_put(Key::from_raw(k1), v1.to_vec()), true), - (Mutation::make_put(Key::from_raw(k2), v2.to_vec()), false), + ( + Mutation::make_put(Key::from_raw(k1), v1.to_vec()), + DoPessimisticCheck, + ), + ( + Mutation::make_put(Key::from_raw(k2), v2.to_vec()), + SkipPessimisticCheck, + ), ]; must_acquire_pessimistic_lock(&engine, k1, k1, 60, 60); pessimistic_prewrite_with_cm( @@ -1369,7 +1390,10 @@ mod tests { must_acquire_pessimistic_lock(&engine, key, key, 10, 10); - let mutations = vec![(Mutation::make_put(Key::from_raw(key), value.to_vec()), true)]; + let mutations = vec![( + Mutation::make_put(Key::from_raw(key), value.to_vec()), + DoPessimisticCheck, + )]; let mut statistics = Statistics::default(); let cmd = super::PrewritePessimistic::new( mutations, @@ -1400,8 +1424,14 @@ mod tests { must_acquire_pessimistic_lock(&engine, k2, k1, 20, 20); let mutations = vec![ - (Mutation::make_put(Key::from_raw(k1), v1.to_vec()), true), - (Mutation::make_put(Key::from_raw(k2), v2.to_vec()), true), + ( + Mutation::make_put(Key::from_raw(k1), v1.to_vec()), + DoPessimisticCheck, + ), + ( + Mutation::make_put(Key::from_raw(k2), v2.to_vec()), + DoPessimisticCheck, + ), ]; let mut statistics = Statistics::default(); // calculated_ts > max_commit_ts @@ -1605,7 +1635,10 @@ mod tests { }; let cmd = if case.pessimistic { PrewritePessimistic::new( - mutations.iter().map(|it| (it.clone(), false)).collect(), + mutations + .iter() + .map(|it| (it.clone(), SkipPessimisticCheck)) + .collect(), keys[0].to_vec(), start_ts, 0, @@ -1813,7 +1846,7 @@ mod tests { &Some(vec![]), 5, 5, - true, + DoPessimisticCheck, 10, ); must_commit(&engine, key, 5, 10); @@ -1821,7 +1854,10 @@ mod tests { // T2: start_ts = 15, commit_ts = 16, 1PC must_acquire_pessimistic_lock(&engine, key, key, 15, 15); let cmd = PrewritePessimistic::with_1pc( - vec![(Mutation::make_put(Key::from_raw(key), b"v2".to_vec()), true)], + vec![( + Mutation::make_put(Key::from_raw(key), b"v2".to_vec()), + DoPessimisticCheck, + )], key.to_vec(), 15.into(), 15.into(), @@ -1836,7 +1872,10 @@ mod tests { // Repeating the T1 prewrite request let cmd = PrewritePessimistic::new( - vec![(Mutation::make_put(Key::from_raw(key), b"v1".to_vec()), true)], + vec![( + Mutation::make_put(Key::from_raw(key), b"v1".to_vec()), + DoPessimisticCheck, + )], key.to_vec(), 5.into(), 200, @@ -1871,7 +1910,10 @@ mod tests { // Repeating the T2 prewrite request let cmd = PrewritePessimistic::with_1pc( - vec![(Mutation::make_put(Key::from_raw(key), b"v2".to_vec()), true)], + vec![( + Mutation::make_put(Key::from_raw(key), b"v2".to_vec()), + DoPessimisticCheck, + )], key.to_vec(), 15.into(), 15.into(), @@ -1909,11 +1951,11 @@ mod tests { let mutations = vec![ ( Mutation::make_put(Key::from_raw(b"k1"), b"v1".to_vec()), - false, + SkipPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()), - true, + DoPessimisticCheck, ), ]; let res = pessimistic_prewrite_with_cm( @@ -1960,13 +2002,13 @@ mod tests { pk: &[u8], secondary_keys, ts: u64, - is_pessimistic_lock, + pessimistic_action, is_retry_request| { let mutation = Mutation::make_put(Key::from_raw(key), value.to_vec()); let mut ctx = Context::default(); ctx.set_is_retry_request(is_retry_request); let cmd = PrewritePessimistic::new( - vec![(mutation, is_pessimistic_lock)], + vec![(mutation, pessimistic_action)], pk.to_vec(), ts.into(), 100, @@ -1991,7 +2033,7 @@ mod tests { &Some(vec![b"k2".to_vec()]), 10, 10, - true, + DoPessimisticCheck, 15, ); must_pessimistic_prewrite_put_async_commit( @@ -2002,7 +2044,7 @@ mod tests { &Some(vec![]), 10, 10, - false, + SkipPessimisticCheck, 15, ); @@ -2011,7 +2053,16 @@ mod tests { must_commit(&engine, b"k2", 10, 20); // This is a re-sent prewrite. - prewrite_with_retry_flag(b"k2", b"v2", b"k1", Some(vec![]), 10, false, true).unwrap(); + prewrite_with_retry_flag( + b"k2", + b"v2", + b"k1", + Some(vec![]), + 10, + SkipPessimisticCheck, + true, + ) + .unwrap(); // Commit repeatedly, these operations should have no effect. must_commit(&engine, b"k1", 10, 25); must_commit(&engine, b"k2", 10, 25); @@ -2029,16 +2080,28 @@ mod tests { // A retrying non-pessimistic-lock prewrite request should not skip constraint // checks. Here it should take no effect, even there's already a newer version // after it. (No matter if it's async commit). - prewrite_with_retry_flag(b"k2", b"v2", b"k1", Some(vec![]), 10, false, true).unwrap(); + prewrite_with_retry_flag( + b"k2", + b"v2", + b"k1", + Some(vec![]), + 10, + SkipPessimisticCheck, + true, + ) + .unwrap(); must_unlocked(&engine, b"k2"); - prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 10, false, true).unwrap(); + prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 10, SkipPessimisticCheck, true) + .unwrap(); must_unlocked(&engine, b"k2"); // Committing still does nothing. must_commit(&engine, b"k2", 10, 25); // Try a different txn start ts (which haven't been successfully committed // before). It should report a PessimisticLockNotFound. - let err = prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 11, false, true).unwrap_err(); + let err = + prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 11, SkipPessimisticCheck, true) + .unwrap_err(); assert!(matches!( err, Error(box ErrorInner::Mvcc(MvccError( @@ -2048,7 +2111,8 @@ mod tests { must_unlocked(&engine, b"k2"); // However conflict still won't be checked if there's a non-retry request // arriving. - prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 10, false, false).unwrap(); + prewrite_with_retry_flag(b"k2", b"v2", b"k1", None, 10, SkipPessimisticCheck, false) + .unwrap(); must_locked(&engine, b"k2", 10); } @@ -2096,7 +2160,10 @@ mod tests { must_rollback(&engine, k1, 10, true); must_acquire_pessimistic_lock(&engine, k1, v1, 15, 15); let prewrite_cmd = PrewritePessimistic::with_defaults( - vec![(Mutation::make_put(Key::from_raw(k1), v1.to_vec()), true)], + vec![( + Mutation::make_put(Key::from_raw(k1), v1.to_vec()), + DoPessimisticCheck, + )], k1.to_vec(), 10.into(), 10.into(), @@ -2149,7 +2216,7 @@ mod tests { b"row", &None, t2_start_ts, - true, + DoPessimisticCheck, 1000, t2_start_ts, 1, @@ -2166,7 +2233,7 @@ mod tests { b"row", &None, t2_start_ts, - false, + SkipPessimisticCheck, 1000, t2_start_ts, 1, @@ -2188,11 +2255,11 @@ mod tests { vec![ ( Mutation::make_put(Key::from_raw(b"row"), b"value".to_vec()), - true, + DoPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"index"), b"value".to_vec()), - false, + SkipPessimisticCheck, ), ], b"row".to_vec(), @@ -2211,11 +2278,11 @@ mod tests { vec![ ( Mutation::make_put(Key::from_raw(b"index"), b"value".to_vec()), - false, + SkipPessimisticCheck, ), ( Mutation::make_put(Key::from_raw(b"row"), b"value".to_vec()), - true, + DoPessimisticCheck, ), ], b"row".to_vec(), @@ -2240,7 +2307,7 @@ mod tests { &None, t1_start_ts, t1_start_ts, - true, + DoPessimisticCheck, 0, false, Assertion::NotExist, @@ -2258,7 +2325,7 @@ mod tests { &None, t1_start_ts, t1_start_ts, - false, + SkipPessimisticCheck, 0, false, Assertion::NotExist, @@ -2335,7 +2402,7 @@ mod tests { &Some(vec![b"k2".to_vec()]), 5, 10, - true, + DoPessimisticCheck, 15, ); must_prewrite_put_impl( @@ -2345,7 +2412,7 @@ mod tests { b"k1", &Some(vec![]), 5.into(), - false, + SkipPessimisticCheck, 100, 10.into(), 1, @@ -2365,7 +2432,7 @@ mod tests { // (is_retry_request flag is not set, here we don't rely on it.) let mutation = Mutation::make_put(Key::from_raw(b"k2"), b"v2".to_vec()); let cmd = PrewritePessimistic::new( - vec![(mutation, false)], + vec![(mutation, SkipPessimisticCheck)], b"k1".to_vec(), 5.into(), 100, diff --git a/src/storage/txn/commands/rollback.rs b/src/storage/txn/commands/rollback.rs index ad22e966590c..7e93e77dee6c 100644 --- a/src/storage/txn/commands/rollback.rs +++ b/src/storage/txn/commands/rollback.rs @@ -75,6 +75,8 @@ impl WriteCommand for Rollback { #[cfg(test)] mod tests { + use kvproto::kvrpcpb::PrewriteRequestPessimisticAction::*; + use crate::storage::{txn::tests::*, TestEngineBuilder}; #[test] @@ -87,7 +89,7 @@ mod tests { must_rollback(&engine, k1, 10, false); must_rollback(&engine, k2, 10, false); - must_pessimistic_prewrite_put(&engine, k2, v, k1, 10, 10, false); + must_pessimistic_prewrite_put(&engine, k2, v, k1, 10, 10, SkipPessimisticCheck); must_rollback(&engine, k2, 10, false); } } diff --git a/src/storage/txn/store.rs b/src/storage/txn/store.rs index 0cd6c5b173b1..2af968c21be3 100644 --- a/src/storage/txn/store.rs +++ b/src/storage/txn/store.rs @@ -636,7 +636,7 @@ mod tests { use concurrency_manager::ConcurrencyManager; use engine_traits::{CfName, IterOptions, ReadOptions}; - use kvproto::kvrpcpb::{AssertionLevel, Context}; + use kvproto::kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*}; use tikv_kv::DummySnapshotExt; use super::*; @@ -708,7 +708,7 @@ mod tests { }, Mutation::make_put(Key::from_raw(key), key.to_vec()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); } diff --git a/tests/benches/hierarchy/mvcc/mod.rs b/tests/benches/hierarchy/mvcc/mod.rs index e982465c621f..f88533171c3b 100644 --- a/tests/benches/hierarchy/mvcc/mod.rs +++ b/tests/benches/hierarchy/mvcc/mod.rs @@ -2,7 +2,7 @@ use concurrency_manager::ConcurrencyManager; use criterion::{black_box, BatchSize, Bencher, Criterion}; -use kvproto::kvrpcpb::{AssertionLevel, Context}; +use kvproto::kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*}; use test_util::KvGenerator; use tikv::storage::{ kv::{Engine, WriteData}, @@ -54,7 +54,7 @@ where &txn_props, Mutation::make_put(Key::from_raw(k), v.clone()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); } @@ -98,7 +98,15 @@ fn mvcc_prewrite>(b: &mut Bencher<'_>, config: &B is_retry_request: false, assertion_level: AssertionLevel::Off, }; - prewrite(&mut txn, &mut reader, &txn_props, mutation, &None, false).unwrap(); + prewrite( + &mut txn, + &mut reader, + &txn_props, + mutation, + &None, + SkipPessimisticCheck, + ) + .unwrap(); } }, BatchSize::SmallInput, diff --git a/tests/benches/hierarchy/txn/mod.rs b/tests/benches/hierarchy/txn/mod.rs index 723d0eb3745d..840d4ac81fa2 100644 --- a/tests/benches/hierarchy/txn/mod.rs +++ b/tests/benches/hierarchy/txn/mod.rs @@ -2,7 +2,7 @@ use concurrency_manager::ConcurrencyManager; use criterion::{black_box, BatchSize, Bencher, Criterion}; -use kvproto::kvrpcpb::{AssertionLevel, Context}; +use kvproto::kvrpcpb::{AssertionLevel, Context, PrewriteRequestPessimisticAction::*}; use test_util::KvGenerator; use tikv::storage::{ kv::{Engine, WriteData}, @@ -50,7 +50,7 @@ where &txn_props, Mutation::make_put(Key::from_raw(k), v.clone()), &None, - false, + SkipPessimisticCheck, ) .unwrap(); } @@ -91,7 +91,15 @@ fn txn_prewrite>(b: &mut Bencher<'_>, config: &Be is_retry_request: false, assertion_level: AssertionLevel::Off, }; - prewrite(&mut txn, &mut reader, &txn_props, mutation, &None, false).unwrap(); + prewrite( + &mut txn, + &mut reader, + &txn_props, + mutation, + &None, + SkipPessimisticCheck, + ) + .unwrap(); let write_data = WriteData::from_modifies(txn.into_modifies()); black_box(engine.write(&ctx, write_data)).unwrap(); } diff --git a/tests/failpoints/cases/test_merge.rs b/tests/failpoints/cases/test_merge.rs index 32bd2f05228d..c602fc6e4f75 100644 --- a/tests/failpoints/cases/test_merge.rs +++ b/tests/failpoints/cases/test_merge.rs @@ -12,7 +12,7 @@ use std::{ use engine_traits::{Peekable, CF_RAFT}; use grpcio::{ChannelBuilder, Environment}; use kvproto::{ - kvrpcpb::*, + kvrpcpb::{PrewriteRequestPessimisticAction::*, *}, raft_serverpb::{PeerState, RaftMessage, RegionLocalState}, tikvpb::TikvClient, }; @@ -1450,7 +1450,7 @@ fn test_merge_pessimistic_locks_with_concurrent_prewrite() { let mut req = PrewriteRequest::default(); req.set_context(cluster.get_ctx(b"k0")); req.set_mutations(vec![mutation].into()); - req.set_is_pessimistic_lock(vec![true]); + req.set_pessimistic_actions(vec![DoPessimisticCheck]); req.set_start_version(10); req.set_for_update_ts(40); req.set_primary_lock(b"k0".to_vec()); diff --git a/tests/failpoints/cases/test_split_region.rs b/tests/failpoints/cases/test_split_region.rs index bf23267a06ae..9ed57b940918 100644 --- a/tests/failpoints/cases/test_split_region.rs +++ b/tests/failpoints/cases/test_split_region.rs @@ -13,7 +13,9 @@ use collections::HashMap; use engine_traits::CF_WRITE; use grpcio::{ChannelBuilder, Environment}; use kvproto::{ - kvrpcpb::{Mutation, Op, PessimisticLockRequest, PrewriteRequest}, + kvrpcpb::{ + Mutation, Op, PessimisticLockRequest, PrewriteRequest, PrewriteRequestPessimisticAction::*, + }, metapb::Region, raft_serverpb::RaftMessage, tikvpb::TikvClient, @@ -966,7 +968,7 @@ fn test_split_pessimistic_locks_with_concurrent_prewrite() { let mut req = PrewriteRequest::default(); req.set_context(cluster.get_ctx(b"a")); req.set_mutations(vec![mutation].into()); - req.set_is_pessimistic_lock(vec![true]); + req.set_pessimistic_actions(vec![DoPessimisticCheck]); req.set_start_version(10); req.set_for_update_ts(commit_ts + 20); req.set_primary_lock(b"a".to_vec()); diff --git a/tests/failpoints/cases/test_storage.rs b/tests/failpoints/cases/test_storage.rs index 40ba7297b7cd..7b92cc7065ee 100644 --- a/tests/failpoints/cases/test_storage.rs +++ b/tests/failpoints/cases/test_storage.rs @@ -19,7 +19,7 @@ use grpcio::*; use kvproto::{ kvrpcpb::{ self, AssertionLevel, BatchRollbackRequest, CommandPri, CommitRequest, Context, GetRequest, - Op, PrewriteRequest, RawPutRequest, + Op, PrewriteRequest, PrewriteRequestPessimisticAction::*, RawPutRequest, }, tikvpb::TikvClient, }; @@ -398,7 +398,10 @@ fn test_pipelined_pessimistic_lock() { storage .sched_txn_command( commands::PrewritePessimistic::new( - vec![(Mutation::make_put(key.clone(), val.clone()), true)], + vec![( + Mutation::make_put(key.clone(), val.clone()), + DoPessimisticCheck, + )], key.to_raw().unwrap(), 10.into(), 3000, @@ -571,7 +574,7 @@ fn test_async_commit_prewrite_with_stale_max_ts() { commands::PrewritePessimistic::new( vec![( Mutation::make_put(Key::from_raw(b"k1"), b"v".to_vec()), - true, + DoPessimisticCheck, )], b"k1".to_vec(), 10.into(), @@ -705,7 +708,11 @@ fn test_async_apply_prewrite_impl( commands::PrewritePessimistic::new( vec![( Mutation::make_put(Key::from_raw(key), value.to_vec()), - need_lock, + if need_lock { + DoPessimisticCheck + } else { + SkipPessimisticCheck + }, )], key.to_vec(), start_ts, @@ -1036,7 +1043,10 @@ fn test_async_apply_prewrite_1pc_impl( storage .sched_txn_command( commands::PrewritePessimistic::new( - vec![(Mutation::make_put(Key::from_raw(key), value.to_vec()), true)], + vec![( + Mutation::make_put(Key::from_raw(key), value.to_vec()), + DoPessimisticCheck, + )], key.to_vec(), start_ts, 0, diff --git a/tests/failpoints/cases/test_transaction.rs b/tests/failpoints/cases/test_transaction.rs index de19d1a790c3..cd5bec990c88 100644 --- a/tests/failpoints/cases/test_transaction.rs +++ b/tests/failpoints/cases/test_transaction.rs @@ -12,7 +12,10 @@ use std::{ use futures::executor::block_on; use grpcio::{ChannelBuilder, Environment}; use kvproto::{ - kvrpcpb::{self as pb, AssertionLevel, Context, Op, PessimisticLockRequest, PrewriteRequest}, + kvrpcpb::{ + self as pb, AssertionLevel, Context, Op, PessimisticLockRequest, PrewriteRequest, + PrewriteRequestPessimisticAction::*, + }, tikvpb::TikvClient, }; use raftstore::store::{util::new_peer, LocksStatus}; @@ -53,10 +56,10 @@ fn test_txn_failpoints() { let (k2, v2) = (b"k2", b"v2"); must_acquire_pessimistic_lock(&engine, k, k, 30, 30); fail::cfg("pessimistic_prewrite", "return()").unwrap(); - must_pessimistic_prewrite_put_err(&engine, k, v1, k, 30, 30, true); + must_pessimistic_prewrite_put_err(&engine, k, v1, k, 30, 30, DoPessimisticCheck); must_prewrite_put(&engine, k2, v2, k2, 31); fail::remove("pessimistic_prewrite"); - must_pessimistic_prewrite_put(&engine, k, v1, k, 30, 30, true); + must_pessimistic_prewrite_put(&engine, k, v1, k, 30, 30, DoPessimisticCheck); must_commit(&engine, k, 30, 40); must_commit(&engine, k2, 31, 41); must_get(&engine, k, 50, v1); diff --git a/tests/integrations/server/kv_service.rs b/tests/integrations/server/kv_service.rs index 8095ebdf2cac..9a946a806bc6 100644 --- a/tests/integrations/server/kv_service.rs +++ b/tests/integrations/server/kv_service.rs @@ -19,7 +19,7 @@ use grpcio_health::{proto::HealthCheckRequest, *}; use kvproto::{ coprocessor::*, debugpb, - kvrpcpb::{self, *}, + kvrpcpb::{self, PrewriteRequestPessimisticAction::*, *}, metapb, raft_serverpb, raft_serverpb::*, tikvpb::*, @@ -2073,7 +2073,7 @@ fn test_commands_write_detail() { mutation.set_op(Op::Put); mutation.set_value(v); prewrite_req.set_mutations(vec![mutation].into()); - prewrite_req.set_is_pessimistic_lock(vec![true]); + prewrite_req.set_pessimistic_actions(vec![DoPessimisticCheck]); prewrite_req.set_context(ctx.clone()); prewrite_req.set_primary_lock(k.clone()); prewrite_req.set_start_version(20);