Skip to content

Commit 2401711

Browse files
authored
Merge branch 'tikv:master' into wip-add-rpc-error-handler-for-scan
2 parents cee5486 + 691df4d commit 2401711

File tree

5 files changed

+155
-7
lines changed

5 files changed

+155
-7
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ Raw mode:
3333
```rust
3434
use tikv_client::RawClient;
3535

36-
let client = RawClient::new(vec!["127.0.0.1:2379"], None).await?;
36+
let client = RawClient::new(vec!["127.0.0.1:2379"]).await?;
3737
client.put("key".to_owned(), "value".to_owned()).await?;
3838
let value = client.get("key".to_owned()).await?;
3939
```
@@ -43,7 +43,7 @@ Transactional mode:
4343
```rust
4444
use tikv_client::TransactionClient;
4545

46-
let txn_client = TransactionClient::new(vec!["127.0.0.1:2379"], None).await?;
46+
let txn_client = TransactionClient::new(vec!["127.0.0.1:2379"]).await?;
4747
let mut txn = txn_client.begin_optimistic().await?;
4848
txn.put("key".to_owned(), "value".to_owned()).await?;
4949
let value = txn.get("key".to_owned()).await?;

src/raw/requests.rs

Lines changed: 84 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ use crate::proto::tikvpb::tikv_client::TikvClient;
1010
use crate::range_request;
1111
use crate::region::RegionWithLeader;
1212
use crate::request::plan::ResponseWithShard;
13-
use crate::request::Collect;
1413
use crate::request::CollectSingle;
1514
use crate::request::DefaultProcessor;
1615
use crate::request::KvRequest;
@@ -19,6 +18,7 @@ use crate::request::Process;
1918
use crate::request::RangeRequest;
2019
use crate::request::Shardable;
2120
use crate::request::SingleKey;
21+
use crate::request::{Batchable, Collect};
2222
use crate::shardable_key;
2323
use crate::shardable_keys;
2424
use crate::shardable_range;
@@ -35,12 +35,15 @@ use crate::Result;
3535
use crate::Value;
3636
use async_trait::async_trait;
3737
use futures::stream::BoxStream;
38+
use futures::{stream, StreamExt};
3839
use std::any::Any;
3940
use std::ops::Range;
4041
use std::sync::Arc;
4142
use std::time::Duration;
4243
use tonic::transport::Channel;
4344

45+
const RAW_KV_REQUEST_BATCH_SIZE: u64 = 16 * 1024; // 16 KB
46+
4447
pub fn new_raw_get_request(key: Vec<u8>, cf: Option<ColumnFamily>) -> kvrpcpb::RawGetRequest {
4548
let mut req = kvrpcpb::RawGetRequest::default();
4649
req.key = key;
@@ -188,6 +191,14 @@ impl KvRequest for kvrpcpb::RawBatchPutRequest {
188191
type Response = kvrpcpb::RawBatchPutResponse;
189192
}
190193

194+
impl Batchable for kvrpcpb::RawBatchPutRequest {
195+
type Item = (kvrpcpb::KvPair, u64);
196+
197+
fn item_size(item: &Self::Item) -> u64 {
198+
(item.0.key.len() + item.0.value.len()) as u64
199+
}
200+
}
201+
191202
impl Shardable for kvrpcpb::RawBatchPutRequest {
192203
type Shard = Vec<(kvrpcpb::KvPair, u64)>;
193204

@@ -204,6 +215,16 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
204215
.collect();
205216
kv_ttl.sort_by(|a, b| a.0.key.cmp(&b.0.key));
206217
region_stream_for_keys(kv_ttl.into_iter(), pd_client.clone())
218+
.flat_map(|result| match result {
219+
Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchPutRequest::batches(
220+
keys,
221+
RAW_KV_REQUEST_BATCH_SIZE,
222+
))
223+
.map(move |batch| Ok((batch, region.clone())))
224+
.boxed(),
225+
Err(e) => stream::iter(Err(e)).boxed(),
226+
})
227+
.boxed()
207228
}
208229

209230
fn apply_shard(&mut self, shard: Self::Shard) {
@@ -212,6 +233,18 @@ impl Shardable for kvrpcpb::RawBatchPutRequest {
212233
self.ttls = ttls;
213234
}
214235

236+
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
237+
where
238+
Self: Sized + Clone,
239+
{
240+
let mut cloned = Self::default();
241+
cloned.context = self.context.clone();
242+
cloned.cf = self.cf.clone();
243+
cloned.for_cas = self.for_cas;
244+
cloned.apply_shard(shard);
245+
cloned
246+
}
247+
215248
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
216249
self.set_leader(&store.region_with_leader)
217250
}
@@ -257,7 +290,56 @@ impl KvRequest for kvrpcpb::RawBatchDeleteRequest {
257290
type Response = kvrpcpb::RawBatchDeleteResponse;
258291
}
259292

260-
shardable_keys!(kvrpcpb::RawBatchDeleteRequest);
293+
impl Batchable for kvrpcpb::RawBatchDeleteRequest {
294+
type Item = Vec<u8>;
295+
296+
fn item_size(item: &Self::Item) -> u64 {
297+
item.len() as u64
298+
}
299+
}
300+
301+
impl Shardable for kvrpcpb::RawBatchDeleteRequest {
302+
type Shard = Vec<Vec<u8>>;
303+
304+
fn shards(
305+
&self,
306+
pd_client: &Arc<impl PdClient>,
307+
) -> BoxStream<'static, Result<(Self::Shard, RegionWithLeader)>> {
308+
let mut keys = self.keys.clone();
309+
keys.sort();
310+
region_stream_for_keys(keys.into_iter(), pd_client.clone())
311+
.flat_map(|result| match result {
312+
Ok((keys, region)) => stream::iter(kvrpcpb::RawBatchDeleteRequest::batches(
313+
keys,
314+
RAW_KV_REQUEST_BATCH_SIZE,
315+
))
316+
.map(move |batch| Ok((batch, region.clone())))
317+
.boxed(),
318+
Err(e) => stream::iter(Err(e)).boxed(),
319+
})
320+
.boxed()
321+
}
322+
323+
fn apply_shard(&mut self, shard: Self::Shard) {
324+
self.keys = shard;
325+
}
326+
327+
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
328+
where
329+
Self: Sized + Clone,
330+
{
331+
let mut cloned = Self::default();
332+
cloned.context = self.context.clone();
333+
cloned.cf = self.cf.clone();
334+
cloned.for_cas = self.for_cas;
335+
cloned.apply_shard(shard);
336+
cloned
337+
}
338+
339+
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
340+
self.set_leader(&store.region_with_leader)
341+
}
342+
}
261343

262344
pub fn new_raw_delete_range_request(
263345
start_key: Vec<u8>,

src/request/plan.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -117,11 +117,10 @@ where
117117
) -> Result<<Self as Plan>::Result> {
118118
let shards = current_plan.shards(&pd_client).collect::<Vec<_>>().await;
119119
debug!("single_plan_handler, shards: {}", shards.len());
120-
let mut handles = Vec::new();
120+
let mut handles = Vec::with_capacity(shards.len());
121121
for shard in shards {
122122
let (shard, region) = shard?;
123-
let mut clone = current_plan.clone();
124-
clone.apply_shard(shard);
123+
let clone = current_plan.clone_then_apply_shard(shard);
125124
let handle = tokio::spawn(Self::single_shard_handler(
126125
pd_client.clone(),
127126
clone,

src/request/shard.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,16 @@ pub trait Shardable {
4848

4949
fn apply_shard(&mut self, shard: Self::Shard);
5050

51+
/// Implementation can skip unnecessary fields clone if fields will be overwritten by `apply_shard`.
52+
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
53+
where
54+
Self: Sized + Clone,
55+
{
56+
let mut cloned = self.clone();
57+
cloned.apply_shard(shard);
58+
cloned
59+
}
60+
5161
fn apply_store(&mut self, store: &RegionStore) -> Result<()>;
5262
}
5363

@@ -103,6 +113,16 @@ impl<Req: KvRequest + Shardable> Shardable for Dispatch<Req> {
103113
self.request.apply_shard(shard);
104114
}
105115

116+
fn clone_then_apply_shard(&self, shard: Self::Shard) -> Self
117+
where
118+
Self: Sized + Clone,
119+
{
120+
Dispatch {
121+
request: self.request.clone_then_apply_shard(shard),
122+
kv_client: self.kv_client.clone(),
123+
}
124+
}
125+
106126
fn apply_store(&mut self, store: &RegionStore) -> Result<()> {
107127
self.kv_client = Some(store.client.clone());
108128
self.request.apply_store(store)

tests/integration_tests.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -871,6 +871,53 @@ async fn raw_write_million() -> Result<()> {
871871
Ok(())
872872
}
873873

874+
/// Tests raw batch put has a large payload.
875+
#[tokio::test]
876+
#[serial]
877+
async fn raw_large_batch_put() -> Result<()> {
878+
const TARGET_SIZE_MB: usize = 100;
879+
const KEY_SIZE: usize = 32;
880+
const VALUE_SIZE: usize = 1024;
881+
882+
let pair_size = KEY_SIZE + VALUE_SIZE;
883+
let target_size_bytes = TARGET_SIZE_MB * 1024 * 1024;
884+
let num_pairs = target_size_bytes / pair_size;
885+
let mut pairs = Vec::with_capacity(num_pairs);
886+
for i in 0..num_pairs {
887+
// Generate key: "bench_key_" + zero-padded number
888+
let key = format!("bench_key_{:010}", i);
889+
890+
// Generate value: repeat pattern to reach VALUE_SIZE
891+
let pattern = format!("value_{}", i % 1000);
892+
let repeat_count = VALUE_SIZE.div_ceil(pattern.len());
893+
let value = pattern.repeat(repeat_count);
894+
895+
pairs.push(KvPair::from((key, value)));
896+
}
897+
898+
init().await?;
899+
let client =
900+
RawClient::new_with_config(pd_addrs(), Config::default().with_default_keyspace()).await?;
901+
902+
client.batch_put(pairs.clone()).await?;
903+
904+
let keys = pairs.iter().map(|pair| pair.0.clone()).collect::<Vec<_>>();
905+
// split into multiple batch_get to avoid response too large error
906+
const BATCH_SIZE: usize = 1000;
907+
let mut got = Vec::with_capacity(num_pairs);
908+
for chunk in keys.chunks(BATCH_SIZE) {
909+
let mut partial = client.batch_get(chunk.to_vec()).await?;
910+
got.append(&mut partial);
911+
}
912+
assert_eq!(got, pairs);
913+
914+
client.batch_delete(keys.clone()).await?;
915+
let res = client.batch_get(keys).await?;
916+
assert!(res.is_empty());
917+
918+
Ok(())
919+
}
920+
874921
/// Tests raw ttl API.
875922
#[tokio::test]
876923
#[serial]

0 commit comments

Comments
 (0)