Skip to content

plan: Handle no leader and invalidate store region #484

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
May 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ jobs:
CARGO_INCREMENTAL: 0
NEXTEST_PROFILE: ci
TIKV_VERSION: v8.5.1
RUST_LOG: info
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ reqwest = { version = "0.11", features = ["json", "native-tls-vendored"] }
rstest = "0.18.2"
serde_json = "1"
serial_test = "0.5.0"
simple_logger = "1"
tempfile = "3.6"
tokio = { version = "1", features = ["sync", "rt-multi-thread", "macros"] }

Expand Down
5 changes: 3 additions & 2 deletions src/common/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::result;
use thiserror::Error;

use crate::proto::kvrpcpb;
use crate::region::RegionVerId;
use crate::BoundRange;

/// An error originating from the TiKV client or dependencies.
Expand Down Expand Up @@ -90,8 +91,8 @@ pub enum Error {
#[error("Region {} is not found in the response", region_id)]
RegionNotFoundInResponse { region_id: u64 },
/// No leader is found for the given id.
#[error("Leader of region {} is not found", region_id)]
LeaderNotFound { region_id: u64 },
#[error("Leader of region {} is not found", region.id)]
LeaderNotFound { region: RegionVerId },
/// Scan limit exceeds the maximum
#[error("Limit {} exceeds max scan limit {}", limit, max_limit)]
MaxScanLimitExceeded { limit: u32, max_limit: u32 },
Expand Down
2 changes: 2 additions & 0 deletions src/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,8 @@ impl PdClient for MockPdClient {

async fn invalidate_region_cache(&self, _ver_id: crate::region::RegionVerId) {}

async fn invalidate_store_cache(&self, _store_id: crate::region::StoreId) {}

async fn load_keyspace(&self, _keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
unimplemented!()
}
Expand Down
72 changes: 39 additions & 33 deletions src/pd/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::proto::metapb;
use crate::region::RegionId;
use crate::region::RegionVerId;
use crate::region::RegionWithLeader;
use crate::region::StoreId;
use crate::region_cache::RegionCache;
use crate::store::KvConnect;
use crate::store::RegionStore;
Expand Down Expand Up @@ -84,7 +85,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_keys_by_region<K, K2>(
self: Arc<Self>,
keys: impl Iterator<Item = K> + Send + Sync + 'static,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<K2>)>>
) -> BoxStream<'static, Result<(Vec<K2>, RegionWithLeader)>>
where
K: AsRef<Key> + Into<K2> + Send + Sync + 'static,
K2: Send + Sync + 'static,
Expand All @@ -102,7 +103,7 @@ pub trait PdClient: Send + Sync + 'static {
}
grouped.push(keys.next().unwrap().into());
}
Ok(Some((keys, (region, grouped))))
Ok(Some((keys, (grouped, region))))
} else {
Ok(None)
}
Expand All @@ -112,10 +113,10 @@ pub trait PdClient: Send + Sync + 'static {
}

/// Returns a Stream which iterates over the contexts for each region covered by range.
fn stores_for_range(
fn regions_for_range(
self: Arc<Self>,
range: BoundRange,
) -> BoxStream<'static, Result<RegionStore>> {
) -> BoxStream<'static, Result<RegionWithLeader>> {
let (start_key, end_key) = range.into_keys();
stream_fn(Some(start_key), move |start_key| {
let end_key = end_key.clone();
Expand All @@ -128,15 +129,14 @@ pub trait PdClient: Send + Sync + 'static {

let region = this.region_for_key(&start_key).await?;
let region_end = region.end_key();
let store = this.map_region_to_store(region).await?;
if end_key
.map(|x| x <= region_end && !x.is_empty())
.unwrap_or(false)
|| region_end.is_empty()
{
return Ok(Some((None, store)));
return Ok(Some((None, region)));
}
Ok(Some((Some(region_end), store)))
Ok(Some((Some(region_end), region)))
}
})
.boxed()
Expand All @@ -146,7 +146,7 @@ pub trait PdClient: Send + Sync + 'static {
fn group_ranges_by_region(
self: Arc<Self>,
mut ranges: Vec<kvrpcpb::KeyRange>,
) -> BoxStream<'static, Result<(RegionWithLeader, Vec<kvrpcpb::KeyRange>)>> {
) -> BoxStream<'static, Result<(Vec<kvrpcpb::KeyRange>, RegionWithLeader)>> {
ranges.reverse();
stream_fn(Some(ranges), move |ranges| {
let this = self.clone();
Expand All @@ -166,7 +166,7 @@ pub trait PdClient: Send + Sync + 'static {
if !region_end.is_empty() && (end_key > region_end || end_key.is_empty()) {
grouped.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);

Expand All @@ -181,11 +181,11 @@ pub trait PdClient: Send + Sync + 'static {
grouped
.push(make_key_range(start_key.into(), region_end.clone().into()));
ranges.push(make_key_range(region_end.into(), end_key.into()));
return Ok(Some((Some(ranges), (region, grouped))));
return Ok(Some((Some(ranges), (grouped, region))));
}
grouped.push(range);
}
Ok(Some((Some(ranges), (region, grouped))))
Ok(Some((Some(ranges), (grouped, region))))
} else {
Ok(None)
}
Expand All @@ -205,6 +205,8 @@ pub trait PdClient: Send + Sync + 'static {
async fn update_leader(&self, ver_id: RegionVerId, leader: metapb::Peer) -> Result<()>;

async fn invalidate_region_cache(&self, ver_id: RegionVerId);

async fn invalidate_store_cache(&self, store_id: StoreId);
}

/// This client converts requests for the logical TiKV cluster into requests
Expand Down Expand Up @@ -271,6 +273,10 @@ impl<KvC: KvConnect + Send + Sync + 'static> PdClient for PdRpcClient<KvC> {
self.region_cache.invalidate_region_cache(ver_id).await
}

async fn invalidate_store_cache(&self, store_id: StoreId) {
self.region_cache.invalidate_store_cache(store_id).await
}

async fn load_keyspace(&self, keyspace: &str) -> Result<keyspacepb::KeyspaceMeta> {
self.pd.load_keyspace(keyspace).await
}
Expand Down Expand Up @@ -390,7 +396,7 @@ pub mod test {
let stream = Arc::new(client).group_keys_by_region(tasks.into_iter());
let mut stream = executor::block_on_stream(stream);

let result: Vec<Key> = stream.next().unwrap().unwrap().1;
let result: Vec<Key> = stream.next().unwrap().unwrap().0;
assert_eq!(
result,
vec![
Expand All @@ -401,27 +407,27 @@ pub mod test {
]
);
assert_eq!(
stream.next().unwrap().unwrap().1,
stream.next().unwrap().unwrap().0,
vec![vec![12].into(), vec![11, 4].into()]
);
assert!(stream.next().is_none());
}

#[test]
fn test_stores_for_range() {
fn test_regions_for_range() {
let client = Arc::new(MockPdClient::default());
let k1: Key = vec![1].into();
let k2: Key = vec![5, 2].into();
let k3: Key = vec![11, 4].into();
let range1 = (k1, k2.clone()).into();
let mut stream = executor::block_on_stream(client.clone().stores_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
let mut stream = executor::block_on_stream(client.clone().regions_for_range(range1));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert!(stream.next().is_none());

let range2 = (k2, k3).into();
let mut stream = executor::block_on_stream(client.stores_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 1);
assert_eq!(stream.next().unwrap().unwrap().region_with_leader.id(), 2);
let mut stream = executor::block_on_stream(client.regions_for_range(range2));
assert_eq!(stream.next().unwrap().unwrap().id(), 1);
assert_eq!(stream.next().unwrap().unwrap().id(), 2);
assert!(stream.next().is_none());
}

Expand All @@ -446,20 +452,20 @@ pub mod test {
let ranges3 = stream.next().unwrap().unwrap();
let ranges4 = stream.next().unwrap().unwrap();

assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(
ranges1.1,
ranges1.0,
vec![
make_key_range(k1.clone(), k2.clone()),
make_key_range(k1.clone(), k_split.clone()),
]
);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.0.id(), 1);
assert_eq!(ranges3.1, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.0.id(), 2);
assert_eq!(ranges4.1, vec![make_key_range(k_split, k4.clone())]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k_split.clone(), k3.clone())]);
assert_eq!(ranges3.1.id(), 1);
assert_eq!(ranges3.0, vec![make_key_range(k2.clone(), k_split.clone())]);
assert_eq!(ranges4.1.id(), 2);
assert_eq!(ranges4.0, vec![make_key_range(k_split, k4.clone())]);
assert!(stream.next().is_none());

let range1 = make_key_range(k1.clone(), k2.clone());
Expand All @@ -470,11 +476,11 @@ pub mod test {
let ranges1 = stream.next().unwrap().unwrap();
let ranges2 = stream.next().unwrap().unwrap();
let ranges3 = stream.next().unwrap().unwrap();
assert_eq!(ranges1.0.id(), 1);
assert_eq!(ranges1.1, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.0.id(), 2);
assert_eq!(ranges2.1, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.0.id(), 3);
assert_eq!(ranges3.1, vec![make_key_range(k5, k6)]);
assert_eq!(ranges1.1.id(), 1);
assert_eq!(ranges1.0, vec![make_key_range(k1, k2)]);
assert_eq!(ranges2.1.id(), 2);
assert_eq!(ranges2.0, vec![make_key_range(k3, k4)]);
assert_eq!(ranges3.1.id(), 3);
assert_eq!(ranges3.0, vec![make_key_range(k5, k6)]);
}
}
44 changes: 27 additions & 17 deletions src/raw/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use crate::request::SingleKey;
use crate::shardable_key;
use crate::shardable_keys;
use crate::shardable_range;
use crate::store::store_stream_for_keys;
use crate::store::store_stream_for_ranges;
use crate::store::region_stream_for_keys;
use crate::store::region_stream_for_ranges;
use crate::store::RegionStore;
use crate::store::Request;
use crate::transaction::HasLocks;
Expand Down Expand Up @@ -194,7 +194,7 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
let kvs = self.pairs.clone();
let ttls = self.ttls.clone();
let mut kv_ttl: Vec<KvPairTTL> = kvs
Expand All @@ -203,15 +203,17 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
.map(|(kv, ttl)| KvPairTTL(kv, ttl))
.collect();
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
store_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_shard(&mut self, shard: Self::Shard) {
let (pairs, ttls) = shard.into_iter().unzip();
self.set_leader(&store.region_with_leader)?;
self.pairs = pairs;
self.ttls = ttls;
Ok(())
}

fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}

Expand Down Expand Up @@ -344,14 +346,16 @@ impl Shardable for kvrpcpb::RawBatchScanRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.ranges.clone(), pd_client.clone())
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
fn apply_shard(&mut self, shard: Self::Shard) {
self.ranges = shard;
Ok(())
}

fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)
}
}

Expand Down Expand Up @@ -470,14 +474,20 @@ impl Shardable for RawCoprocessorRequest {
fn shards(
&self,
pd_client: &Arc<impl PdClient>,
) -> BoxStream<'static, Result<(Self::Shard, RegionStore)>> {
store_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
region_stream_for_ranges(self.inner.ranges.clone(), pd_client.clone())
}

fn apply_shard(&mut self, shard: Self::Shard) {
self.inner.ranges = shard;
}

fn apply_shard(&mut self, shard: Self::Shard, store: &RegionStore) -> Result<()> {
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
self.set_leader(&store.region_with_leader)?;
self.inner.ranges.clone_from(&shard);
self.inner.data = (self.data_builder)(store.region_with_leader.region.clone(), shard);
self.inner.data = (self.data_builder)(
store.region_with_leader.region.clone(),
self.inner.ranges.clone(),
);
Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl RegionWithLeader {
.as_ref()
.cloned()
.ok_or_else(|| Error::LeaderNotFound {
region_id: self.id(),
region: self.ver_id(),
})
.map(|s| s.store_id)
}
Expand Down
5 changes: 5 additions & 0 deletions src/region_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ impl<C: RetryClientTrait> RegionCache<C> {
}
}

pub async fn invalidate_store_cache(&self, store_id: StoreId) {
let mut cache = self.store_cache.write().await;
cache.remove(&store_id);
}

pub async fn read_through_all_stores(&self) -> Result<Vec<Store>> {
let stores = self
.inner_client
Expand Down
14 changes: 6 additions & 8 deletions src/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ mod test {
use crate::proto::pdpb::Timestamp;
use crate::proto::tikvpb::tikv_client::TikvClient;
use crate::region::RegionWithLeader;
use crate::store::store_stream_for_keys;
use crate::store::region_stream_for_keys;
use crate::store::HasRegionError;
use crate::transaction::lowering::new_commit_request;
use crate::Error;
Expand Down Expand Up @@ -168,22 +168,20 @@ mod test {
pd_client: &std::sync::Arc<impl crate::pd::PdClient>,
) -> futures::stream::BoxStream<
'static,
crate::Result<(Self::Shard, crate::store::RegionStore)>,
crate::Result<(Self::Shard, crate::region::RegionWithLeader)>,
> {
// Increases by 1 for each call.
self.test_invoking_count
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
store_stream_for_keys(
region_stream_for_keys(
Some(Key::from("mock_key".to_owned())).into_iter(),
pd_client.clone(),
)
}

fn apply_shard(
&mut self,
_shard: Self::Shard,
_store: &crate::store::RegionStore,
) -> crate::Result<()> {
fn apply_shard(&mut self, _shard: Self::Shard) {}

fn apply_store(&mut self, _store: &crate::store::RegionStore) -> crate::Result<()> {
Ok(())
}
}
Expand Down
Loading
Loading