Skip to content

Commit

Permalink
fix: loop all sub tables to get table info (#1224)
Browse files Browse the repository at this point in the history
## Rationale
Followup PR of #1220

## Detailed Changes


## Test Plan
Manually.
  • Loading branch information
jiacai2050 authored Sep 20, 2023
1 parent 20ee56e commit ca44455
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 41 deletions.
93 changes: 52 additions & 41 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
use table_engine::{
engine::{CreateTableParams, EngineRuntimes, TableState},
remote::model::{GetTableInfoRequest, TableIdentifier},
partition::PartitionInfo,
remote::model::{GetTableInfoRequest, TableIdentifier, TableInfo},
table::{TableId, TableRef},
PARTITION_TABLE_ENGINE_TYPE,
};
Expand Down Expand Up @@ -314,27 +315,52 @@ impl Proxy {
Ok(table)
}

async fn maybe_open_partition_table_if_not_exist(
async fn get_partition_table_info(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
) -> Result<()> {
if let Err(e) = self
.open_partition_table_inner(catalog_name, schema_name, table_name)
.await
{
warn!("Open partition table failed, err:{e:?}");
base_name: &str,
part_info: &PartitionInfo,
) -> Result<TableInfo> {
let get_inner = |i| async move {
// TODO: the remote engine should provide a method to get all sub table names.
let sub_partition_table_name = util::get_sub_partition_name(base_name, part_info, i);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

Ok(table)
};

let part_num = part_info.get_partition_num();
// Loop all sub tables to get table info in case of some of them has problems.
for i in 0..part_num - 1 {
let ret = get_inner(i).await;
if let Err(e) = ret {
warn!("Failed to get table info, err:{e:?}");
} else {
return ret;
}
}

// When open remote table failed, we currently don't return error outside.
// This is because when sub_table[0] is unhealthy, we can not drop the partition
// table.
// TODO: maybe we can find a more elegant way to deal with this issue.
Ok(())
// return the last sub table's get result to outside
get_inner(part_num - 1).await
}

async fn open_partition_table_inner(
async fn maybe_open_partition_table_if_not_exist(
&self,
catalog_name: &str,
schema_name: &str,
Expand Down Expand Up @@ -403,39 +429,24 @@ impl Proxy {
let partition_table_info = table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
0usize,
);
let table = self
.instance
.remote_engine_ref
.get_table_info(GetTableInfoRequest {
table: TableIdentifier {
catalog: catalog_name.to_string(),
schema: schema_name.to_string(),
table: first_sub_partition_table_name,
},
})
.await
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: "Failed to get table",
})?;

let table_info = self
.get_partition_table_info(
catalog_name,
schema_name,
&partition_table_info.name,
partition_table_info.partition_info.as_ref().unwrap(),
)
.await?;
// Partition table is a virtual table, so we need to create it manually.
// Partition info is stored in ceresmeta, so we need to use create_table_request
// to create it.
let params = CreateTableParams {
catalog_name: catalog_name.to_string(),
schema_name: schema_name.to_string(),
table_name: partition_table_info.name,
table_schema: table.table_schema,
engine: table.engine,
table_options: table.options,
table_schema: table_info.table_schema,
engine: table_info.engine,
table_options: table_info.options,
partition_info: partition_table_info.partition_info,
};
let create_table_request = CreateTableRequest {
Expand Down
10 changes: 10 additions & 0 deletions table_engine/src/partition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,23 @@ pub enum PartitionInfo {
}

impl PartitionInfo {
#[inline]
pub fn get_definitions(&self) -> Vec<PartitionDefinition> {
match self {
Self::Random(v) => v.definitions.clone(),
Self::Hash(v) => v.definitions.clone(),
Self::Key(v) => v.definitions.clone(),
}
}

#[inline]
pub fn get_partition_num(&self) -> usize {
match self {
Self::Random(v) => v.definitions.len(),
Self::Hash(v) => v.definitions.len(),
Self::Key(v) => v.definitions.len(),
}
}
}

#[derive(Clone, Debug, PartialEq, Eq, Default)]
Expand Down

0 comments on commit ca44455

Please sign in to comment.