Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: auto create table #895

Merged
merged 4 commits into from
May 18, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,3 @@ DROP TABLE IF EXISTS `partition_table_t`;

affected_rows: 0

SHOW CREATE TABLE partition_table_t;

Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create plan, query: SHOW CREATE TABLE partition_table_t;. Caused by: Failed to create plan, err:Table not found, table:partition_table_t" })

Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ SELECT * from partition_table_t where name in ("ceresdb5", "ceresdb6", "ceresdb7

DROP TABLE IF EXISTS `partition_table_t`;

SHOW CREATE TABLE partition_table_t;
-- SHOW CREATE TABLE partition_table_t;
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
6 changes: 6 additions & 0 deletions meta_client/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ pub struct TableInfo {
pub partition_info: Option<PartitionInfo>,
}

impl TableInfo {
pub fn is_partition_table(&self) -> bool {
self.partition_info.is_some()
}
}

impl TryFrom<meta_service_pb::TableInfo> for TableInfo {
type Error = Error;

Expand Down
7 changes: 4 additions & 3 deletions proxy/src/forward.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ mod tests {
use catalog::consts::DEFAULT_SCHEMA;
use ceresdbproto::storage::{Route, SqlQueryRequest, SqlQueryResponse};
use futures::FutureExt;
use router::{PartitionTableInfo, Router};
use meta_client::types::TableInfo;
use router::Router;
use tonic::IntoRequest;

use super::*;
Expand Down Expand Up @@ -392,11 +393,11 @@ mod tests {
}
}

async fn fetch_partition_table_info(
async fn fetch_table_info(
&self,
_schema: &str,
_table: &str,
) -> router::Result<Option<PartitionTableInfo>> {
) -> router::Result<Option<TableInfo>> {
return Ok(None);
}
}
Expand Down
19 changes: 13 additions & 6 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,12 +240,19 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
msg: format!("Failed to find table, table_name:{table_name}"),
})?;

let partition_table_info_in_meta = self
let table_info_in_meta = self
.router
.fetch_partition_table_info(schema_name, table_name)
.fetch_table_info(schema_name, table_name)
.await?;

match (table, &partition_table_info_in_meta) {
if let Some(table_info_in_meta) = &table_info_in_meta {
// No need to handle non-partition table.
if !table_info_in_meta.is_partition_table() {
return Ok(());
}
}

match (table, &table_info_in_meta) {
(Some(table), Some(partition_table_info)) => {
// No need to create partition table when table_id match.
if table.id().as_u64() == partition_table_info.id {
Expand Down Expand Up @@ -285,13 +292,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
(None, Some(_)) => (),
}

let partition_table_info = partition_table_info_in_meta.unwrap();
let partition_table_info = table_info_in_meta.unwrap();

// If table not exists, open it.
// Get table_schema from first sub partition table.
let first_sub_partition_table_name = util::get_sub_partition_name(
&partition_table_info.name,
&partition_table_info.partition_info,
partition_table_info.partition_info.as_ref().unwrap(),
0usize,
);
let table = self
Expand Down Expand Up @@ -324,7 +331,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
options: table.options,
state: TableState::Stable,
shard_id: DEFAULT_SHARD_ID,
partition_info: Some(partition_table_info.partition_info),
partition_info: partition_table_info.partition_info,
};
let create_opts = CreateOptions {
table_engine: self.instance.partition_table_engine.clone(),
Expand Down
140 changes: 86 additions & 54 deletions proxy/src/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
ctx: Context,
req: WriteRequest,
) -> Result<WriteResponse> {
let request_id = RequestId::next_id();

let write_context = req.context.clone().context(ErrNoCause {
msg: "Missing context",
code: StatusCode::BAD_REQUEST,
})?;

self.handle_auto_create_table(request_id, &write_context.database, &req)
.await?;

let (write_request_to_local, write_requests_to_forward) =
self.split_write_request(req).await?;

Expand All @@ -86,8 +91,11 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {

// Write to local.
if !write_request_to_local.table_requests.is_empty() {
let local_handle =
async move { Ok(self.write_to_local(ctx, write_request_to_local).await) };
let local_handle = async move {
Ok(self
.write_to_local(ctx, request_id, write_request_to_local)
.await)
};
futures.push(local_handle.boxed());
}

Expand Down Expand Up @@ -232,8 +240,12 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
}
}

async fn write_to_local(&self, ctx: Context, req: WriteRequest) -> Result<WriteResponse> {
let request_id = RequestId::next_id();
async fn write_to_local(
&self,
ctx: Context,
request_id: RequestId,
req: WriteRequest,
) -> Result<WriteResponse> {
let begin_instant = Instant::now();
let deadline = ctx.timeout.map(|t| begin_instant + t);
let catalog = self.instance.catalog_manager.default_catalog_name();
Expand Down Expand Up @@ -304,59 +316,38 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let table_name = &write_table_req.table;
self.maybe_open_partition_table_if_not_exist(&catalog, &schema, table_name)
.await?;
let mut table = self.try_get_table(&catalog, &schema, table_name)?;

match table.clone() {
None => {
if auto_create_table {
self.create_table(
request_id,
&catalog,
&schema,
&write_table_req,
&schema_config,
deadline,
)
.await?;
// try to get table again
table = self.try_get_table(&catalog, &schema, table_name)?;
}
}
Some(t) => {
if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that
// has already been successfully written to be written again and affect
// the accuracy of the data.
// * Currently, the decision to add columns is made at the request level,
// not at the row level, so the cost is relatively small.
let table_schema = t.schema();
let columns =
find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id, &catalog, &schema, t, columns, deadline,
)
.await?;
}
}
let table = self
.try_get_table(&catalog, &schema, table_name)?
.with_context(|| ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
})?;

if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that has
// already been successfully written to be written again and affect the
// accuracy of the data.
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = table.schema();
let columns = find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
self.execute_add_columns_plan(
request_id,
&catalog,
&schema,
table.clone(),
columns,
deadline,
)
.await?;
}
}

match table {
Some(table) => {
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}
None => {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
}
.fail();
}
}
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}

Ok(plan_vec)
Expand Down Expand Up @@ -425,6 +416,47 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
})
}

async fn handle_auto_create_table(
&self,
request_id: RequestId,
database: &str,
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
req: &WriteRequest,
) -> Result<()> {
if !self.auto_create_table {
return Ok(());
}

let schema_config = self
.schema_config_provider
.schema_config(database)
.box_err()
.with_context(|| ErrWithCause {
code: StatusCode::INTERNAL_SERVER_ERROR,
msg: format!("Fail to fetch schema config, schema_name:{database}"),
})?
.cloned()
.unwrap_or_default();
for write_table_req in &req.table_requests {
let table_info = self
.router
.fetch_table_info(database, &write_table_req.table)
.await?;
if table_info.is_some() {
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
continue;
}
self.create_table(
chunshao90 marked this conversation as resolved.
Show resolved Hide resolved
request_id,
self.instance.catalog_manager.default_catalog_name(),
database,
write_table_req,
&schema_config,
None,
)
.await?;
}
Ok(())
}

async fn create_table(
&self,
request_id: RequestId,
Expand Down
Loading