Skip to content

Reduce some panics #248

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,6 @@ pub struct Region {
}

impl Region {
#[allow(dead_code)]
pub fn switch_peer(&mut self, _to: StoreId) -> Result<()> {
unimplemented!()
}

pub fn contains(&self, key: &Key) -> bool {
let key: &[u8] = key.into();
let start_key = self.region.get_start_key();
Expand Down
11 changes: 9 additions & 2 deletions src/request/plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,18 @@ impl<Req: KvRequest> Plan for Dispatch<Req> {
let result = self
.kv_client
.as_ref()
.expect("Unreachable: kv_client has not been initialised in Dispatch")
.ok_or_else(|| {
Error::StringError(
"Unreachable: kv_client has not been initialised in Dispatch".to_owned(),
)
})?
.dispatch(&self.request)
.await;
let result = stats.done(result);
result.map(|r| *r.downcast().expect("Downcast failed"))
result.map(|r| {
*r.downcast()
.expect("Downcast failed: request and response type mismatch")
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/transaction/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@ impl Transaction {
let first_key = keys[0].clone();
let primary_lock = self.buffer.get_primary_key_or(&first_key).await;
let lock_ttl = DEFAULT_LOCK_TTL;
let for_update_ts = self.rpc.clone().get_timestamp().await.unwrap();
let for_update_ts = self.rpc.clone().get_timestamp().await?;
self.options.push_for_update_ts(for_update_ts.clone());
let request = new_pessimistic_lock_request(
keys.clone().into_iter(),
Expand Down Expand Up @@ -843,6 +843,7 @@ impl Committer {
}

let commit_ts = if self.options.async_commit {
// FIXME: min_commit_ts == 0 => fallback to normal 2PC
min_commit_ts.unwrap()
} else {
match self.commit_primary().await {
Expand Down
13 changes: 5 additions & 8 deletions tikv-client-pd/src/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl PdMessage for pdpb::GetRegionRequest {
type Response = pdpb::GetRegionResponse;

async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_region_async_opt(self, opt).unwrap().await
client.get_region_async_opt(self, opt)?.await
}
}

Expand All @@ -299,7 +299,7 @@ impl PdMessage for pdpb::GetRegionByIdRequest {
type Response = pdpb::GetRegionResponse;

async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_region_by_id_async_opt(self, opt).unwrap().await
client.get_region_by_id_async_opt(self, opt)?.await
}
}

Expand All @@ -308,7 +308,7 @@ impl PdMessage for pdpb::GetStoreRequest {
type Response = pdpb::GetStoreResponse;

async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_store_async_opt(self, opt).unwrap().await
client.get_store_async_opt(self, opt)?.await
}
}

Expand All @@ -317,7 +317,7 @@ impl PdMessage for pdpb::GetAllStoresRequest {
type Response = pdpb::GetAllStoresResponse;

async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client.get_all_stores_async_opt(self, opt).unwrap().await
client.get_all_stores_async_opt(self, opt)?.await
}
}

Expand All @@ -326,10 +326,7 @@ impl PdMessage for pdpb::UpdateGcSafePointRequest {
type Response = pdpb::UpdateGcSafePointResponse;

async fn rpc(&self, client: &pdpb::PdClient, opt: CallOption) -> GrpcResult<Self::Response> {
client
.update_gc_safe_point_async_opt(self, opt)
.unwrap()
.await
client.update_gc_safe_point_async_opt(self, opt)?.await
}
}

Expand Down