Skip to content

Commit

Permalink
refactor by CR
Browse files Browse the repository at this point in the history
  • Loading branch information
chunshao90 committed Oct 10, 2023
1 parent c41781b commit 44ee3f1
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 29 deletions.
7 changes: 3 additions & 4 deletions analytic_engine/src/instance/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ impl<'a> Alterer<'a> {
// if the alter schema request is idempotent, we can skip the alter operation.
if self.validate_before_alter(&request)? {
info!(
"Instance alter schema, idempotent, skip alter, table:{:?}",
"Skip alter because of the altered schema is the same as the current, table:{}",
self.table_data.name
);
return Ok(());
Expand Down Expand Up @@ -164,9 +164,8 @@ impl<'a> Alterer<'a> {

// Most validation should be done by catalog module, so we don't do too much
// duplicate check here, especially the schema compatibility.
// Boolean return value indicates whether the alter operation is idempotent.
// If it is idempotent, we can skip the alter operation.
// true: idempotent, false: not idempotent.
// The returned value denotes whether the altered schema is same as the current
// one.
fn validate_before_alter(&self, request: &AlterSchemaRequest) -> Result<bool> {
ensure!(
!self.table_data.is_dropped(),
Expand Down
44 changes: 26 additions & 18 deletions partition_table_engine/src/partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,18 +352,20 @@ impl Table for PartitionTableImpl {
futures.push(partition);
}

let mut result = None;
while let Some(ret) = futures.next().await {
if ret.is_err() {
error!("Alter schema failed, err:{:?}", ret);
if result.is_none() {
result = Some(ret.box_err().context(AlterSchema { table: self.name() }));
}
let mut alter_err = None;
while let Some(alter_ret) = futures.next().await {
if let Err(e) = &alter_ret {
error!("Alter schema failed, table_name:{}, err:{e}", self.name());
alter_err.get_or_insert(
alter_ret
.box_err()
.context(AlterSchema { table: self.name() }),
);
}
}

// Remove the first error.
if let Some(ret) = result {
if let Some(ret) = alter_err {
ret?;
}

Expand All @@ -376,7 +378,9 @@ impl Table for PartitionTableImpl {
})
.await
.box_err()
.context(AlterSchema { table: self.name() })?;
.with_context(|| AlterSchema {
table: self.get_sub_table_ident(0).table,
})?;

Ok(0)
}
Expand All @@ -403,18 +407,20 @@ impl Table for PartitionTableImpl {
futures.push(partition);
}

let mut result = None;
while let Some(ret) = futures.next().await {
if ret.is_err() {
error!("Alter options failed, err:{:?}", ret);
if result.is_none() {
result = Some(ret.box_err().context(AlterOptions { table: self.name() }));
}
let mut alter_err = None;
while let Some(alter_ret) = futures.next().await {
if let Err(e) = &alter_ret {
error!("Alter options failed, table_name:{}, err:{e}", self.name());
alter_err.get_or_insert(
alter_ret
.box_err()
.context(AlterOptions { table: self.name() }),
);
}
}

// Remove the first error.
if let Some(ret) = result {
if let Some(ret) = alter_err {
ret?;
}

Expand All @@ -426,7 +432,9 @@ impl Table for PartitionTableImpl {
})
.await
.box_err()
.context(AlterOptions { table: self.name() })?;
.with_context(|| AlterOptions {
table: self.get_sub_table_ident(0).table,
})?;

Ok(0)
}
Expand Down
6 changes: 5 additions & 1 deletion remote_engine_client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ impl Client {

let mut result = Ok(());
// Alter schema to remote engine with retry.
// TODO: Define a macro to reuse the retry logic.
for i in 0..(self.max_retry + 1) {
let resp = rpc_client
.alter_table_schema(Request::new(request_pb.clone()))
Expand All @@ -280,7 +281,10 @@ impl Client {
});

if let Err(e) = resp {
error!("Failed to alter schema to remote engine, table:{table_ident:?}, err:{e}");
error!(
"Failed to alter schema to remote engine,
table:{table_ident:?}, err:{e}"
);

result = Err(e);

Expand Down
10 changes: 4 additions & 6 deletions remote_engine_client/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

//! Config for [Client]

use std::str::FromStr;

use arrow_ext::ipc::CompressOptions;
use serde::{Deserialize, Serialize};
use time_ext::ReadableDuration;
Expand All @@ -39,17 +37,17 @@ pub struct Config {
impl Default for Config {
fn default() -> Self {
Self {
connect_timeout: ReadableDuration::from_str("3s").unwrap(),
connect_timeout: ReadableDuration::secs(3),
channel_pool_max_size_per_partition: 16,
channel_pool_partition_num: 16,
channel_keep_alive_interval: ReadableDuration::from_str("600s").unwrap(),
channel_keep_alive_timeout: ReadableDuration::from_str("3s").unwrap(),
channel_keep_alive_interval: ReadableDuration::secs(600),
channel_keep_alive_timeout: ReadableDuration::secs(3),
channel_keep_alive_while_idle: true,
route_cache_max_size_per_partition: 16,
route_cache_partition_num: 16,
compression: CompressOptions::default(),
max_retry: 5,
retry_interval: ReadableDuration::from_str("5s").unwrap(),
retry_interval: ReadableDuration::secs(5),
}
}
}

0 comments on commit 44ee3f1

Please sign in to comment.