From 8941df982ca41034ffc5d86407fd831466f1d87d Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Tue, 16 May 2023 14:44:10 +0800 Subject: [PATCH 1/2] refactor: proxy sql read --- .../env/cluster/ddl/create_tables.result | 8 +- .../env/cluster/ddl/partition_table.result | 6 +- .../cases/env/local/ddl/create_tables.result | 6 +- proxy/src/grpc/prom_query.rs | 40 +--- proxy/src/grpc/sql_query.rs | 208 ++++-------------- proxy/src/http/sql.rs | 125 ++--------- proxy/src/lib.rs | 71 +----- proxy/src/read.rs | 201 +++++++++++++++++ server/src/grpc/storage_service/mod.rs | 30 +-- 9 files changed, 284 insertions(+), 411 deletions(-) create mode 100644 proxy/src/read.rs diff --git a/integration_tests/cases/env/cluster/ddl/create_tables.result b/integration_tests/cases/env/cluster/ddl/create_tables.result index 946b6262f0..5fb40c5746 100644 --- a/integration_tests/cases/env/cluster/ddl/create_tables.result +++ b/integration_tests/cases/env/cluster/ddl/create_tables.result @@ -44,11 +44,11 @@ affected_rows: 0 CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index b00fa25611..6076eed22d 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -4,7 +4,7 @@ affected_rows: 0 DROP TABLE IF EXISTS `__partition_table_t_0`; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create interpreter. Caused by: Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: DROP TABLE IF EXISTS `__partition_table_t_0`;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) CREATE TABLE `partition_table_t`( `name`string TAG, @@ -24,7 +24,7 @@ String("partition_table_t"),String("CREATE TABLE `partition_table_t` (`tsid` uin SHOW CREATE TABLE __partition_table_t_0; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create interpreter. Caused by: Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: SHOW CREATE TABLE __partition_table_t_0;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) INSERT INTO partition_table_t (t, name, value) VALUES (1651737067000, "ceresdb0", 100), @@ -43,7 +43,7 @@ affected_rows: 11 SELECT * from __partition_table_t_0 where name = "ceresdb0"; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to create interpreter. Caused by: Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: SELECT * from __partition_table_t_0 where name = \"ceresdb0\";. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) SELECT * from partition_table_t where name = "ceresdb0"; diff --git a/integration_tests/cases/env/local/ddl/create_tables.result b/integration_tests/cases/env/local/ddl/create_tables.result index 98439cbfc2..cd5fa6118c 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.result +++ b/integration_tests/cases/env/local/ddl/create_tables.result @@ -48,7 +48,7 @@ affected_rows: 0 CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute interpreter, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index ee6cbcc6db..4ef9ebeecf 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -19,7 +19,7 @@ use common_types::{ }; use common_util::error::BoxError; use http::StatusCode; -use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; +use interpreters::interpreter::Output; use log::info; use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; use query_frontend::{ @@ -116,45 +116,15 @@ impl Proxy { msg: "Query is blocked", })?; - // Execute in interpreter - let interpreter_ctx = InterpreterContext::builder(request_id, deadline) - // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(catalog.to_string(), schema) - .build(); - let interpreter_factory = Factory::new( - self.instance.query_executor.clone(), - self.instance.catalog_manager.clone(), - self.instance.table_engine.clone(), - self.instance.table_manipulator.clone(), - ); - let interpreter = interpreter_factory - .create(interpreter_ctx, plan) + let output = self + .execute_plan(request_id, catalog, &schema, plan, deadline) + .await .box_err() .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Failed to create interpreter", + msg: "Failed to execute plan", })?; - let output = if let Some(deadline) = deadline { - tokio::time::timeout_at( - tokio::time::Instant::from_std(deadline), - interpreter.execute(), - ) - .await - .box_err() - .context(ErrWithCause { - code: StatusCode::REQUEST_TIMEOUT, - msg: "Query timeout", - })? - } else { - interpreter.execute().await - } - .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Failed to execute interpreter", - })?; - let resp = convert_output(output, column_name) .box_err() .context(ErrWithCause { diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index 1cdf336177..4b9a75210b 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -2,7 +2,7 @@ //! Query handler -use std::{sync::Arc, time::Instant}; +use std::sync::Arc; use arrow_ext::ipc::{CompressOptions, CompressionMethod, RecordBatchesEncoder}; use ceresdbproto::{ @@ -12,20 +12,15 @@ use ceresdbproto::{ ArrowPayload, SqlQueryRequest, SqlQueryResponse, }, }; -use common_types::{record_batch::RecordBatch, request_id::RequestId}; -use common_util::{error::BoxError, time::InstantExt}; +use common_types::record_batch::RecordBatch; +use common_util::error::BoxError; use futures::{stream, stream::BoxStream, FutureExt, StreamExt}; use http::StatusCode; -use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; -use log::{error, info, warn}; +use interpreters::interpreter::Output; +use log::{error, warn}; use query_engine::executor::Executor as QueryExecutor; -use query_frontend::{ - frontend, - frontend::{Context as SqlContext, Frontend}, - provider::CatalogMetaProvider, -}; use router::endpoint::Endpoint; -use snafu::{ensure, ResultExt}; +use snafu::ResultExt; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Channel, IntoRequest}; @@ -33,6 +28,7 @@ use tonic::{transport::Channel, IntoRequest}; use crate::{ error::{self, ErrNoCause, ErrWithCause, Error, Result}, forward::{ForwardRequest, ForwardResult}, + read::SqlResponse, Context, Proxy, }; @@ -53,6 +49,27 @@ impl Proxy { } } + async fn handle_sql_query_internal( + &self, + ctx: Context, + req: SqlQueryRequest, + ) -> Result { + if req.context.is_none() { + return ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: "Database is not set", + } + .fail(); + } + + let req_context = req.context.as_ref().unwrap(); + let schema = &req_context.database; + match self.handle_sql(ctx, schema, &req.sql).await? { + SqlResponse::Forwarded(resp) => Ok(resp), + SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length), + } + } + pub async fn handle_stream_sql_query( self: Arc, ctx: Context, @@ -72,28 +89,21 @@ impl Proxy { } } - async fn handle_sql_query_internal( - &self, - ctx: Context, - req: SqlQueryRequest, - ) -> Result { - let req = match self.maybe_forward_sql_query(&req).await? { - Some(resp) => match resp { - ForwardResult::Forwarded(resp) => return resp, - ForwardResult::Local => req, - }, - None => req, - }; - - let output = self.fetch_sql_query_output(ctx, &req).await?; - convert_output(&output, self.resp_compress_min_length) - } - async fn handle_stream_query_internal( self: Arc, ctx: Context, req: SqlQueryRequest, ) -> Result> { + if req.context.is_none() { + return ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: "Database is not set", + } + .fail(); + } + + let req_context = req.context.as_ref().unwrap(); + let schema = req_context.database.clone(); let req = match self.clone().maybe_forward_stream_sql_query(&req).await { Some(resp) => match resp { ForwardResult::Forwarded(resp) => return resp, @@ -105,7 +115,10 @@ impl Proxy { let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); let runtime = ctx.runtime.clone(); let resp_compress_min_length = self.resp_compress_min_length; - let output = self.as_ref().fetch_sql_query_output(ctx, &req).await?; + let output = self + .as_ref() + .fetch_sql_query_output(ctx, &schema, &req.sql) + .await?; runtime.spawn(async move { match output { Output::AffectedRows(rows) => { @@ -195,143 +208,6 @@ impl Proxy { } } } - - async fn fetch_sql_query_output(&self, ctx: Context, req: &SqlQueryRequest) -> Result { - let request_id = RequestId::next_id(); - let begin_instant = Instant::now(); - let deadline = ctx.timeout.map(|t| begin_instant + t); - let catalog = self.instance.catalog_manager.default_catalog_name(); - - info!("Handle sql query, request_id:{request_id}, request:{req:?}"); - - let req_ctx = req.context.as_ref().unwrap(); - let schema = &req_ctx.database; - let instance = &self.instance; - // TODO(yingwen): Privilege check, cannot access data of other tenant - // TODO(yingwen): Maybe move MetaProvider to instance - let provider = CatalogMetaProvider { - manager: instance.catalog_manager.clone(), - default_catalog: catalog, - default_schema: schema, - function_registry: &*instance.function_registry, - }; - let frontend = Frontend::new(provider); - - let mut sql_ctx = SqlContext::new(request_id, deadline); - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &req.sql) - .box_err() - .context(ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: "Failed to parse sql", - })?; - - ensure!( - !stmts.is_empty(), - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("No valid query statement provided, sql:{}", req.sql), - } - ); - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, sql:{}", - stmts.len(), - req.sql - ), - } - ); - - // Open partition table if needed. - let table_name = frontend::parse_table_name(&stmts); - if let Some(table_name) = table_name { - self.maybe_open_partition_table_if_not_exist(catalog, schema, &table_name) - .await?; - } - - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - let plan = frontend - // TODO(yingwen): Check error, some error may indicate that the sql is invalid. Now we - // return internal server error in those cases - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("Failed to create plan, query:{}", req.sql), - })?; - - instance - .limiter - .try_limit(&plan) - .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Query is blocked", - })?; - - if let Some(deadline) = deadline { - if deadline.check_deadline() { - return ErrNoCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Query timeout", - } - .fail(); - } - } - - // Execute in interpreter - let interpreter_ctx = InterpreterContext::builder(request_id, deadline) - // Use current ctx's catalog and schema as default catalog and schema - .default_catalog_and_schema(catalog.to_string(), schema.to_string()) - .build(); - let interpreter_factory = Factory::new( - instance.query_executor.clone(), - instance.catalog_manager.clone(), - instance.table_engine.clone(), - instance.table_manipulator.clone(), - ); - let interpreter = interpreter_factory - .create(interpreter_ctx, plan) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Failed to create interpreter", - })?; - - let output = if let Some(deadline) = deadline { - tokio::time::timeout_at( - tokio::time::Instant::from_std(deadline), - interpreter.execute(), - ) - .await - .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Query timeout", - })? - } else { - interpreter.execute().await - } - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("Failed to execute interpreter, sql:{}", req.sql), - })?; - - let cost = begin_instant.saturating_elapsed(); - info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, request:{req:?}"); - - Ok(output) - } } // TODO(chenxiang): Output can have both `rows` and `affected_rows` diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index c305a69ab2..b1c58f0042 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -1,38 +1,30 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. -use std::{io::Cursor, time::Instant}; +use std::io::Cursor; use arrow::{ipc::reader::StreamReader, record_batch::RecordBatch as ArrowRecordBatch}; use ceresdbproto::storage::{ arrow_payload::Compression, sql_query_response::Output as OutputPb, ArrowPayload, - RequestContext as GrpcRequestContext, SqlQueryRequest, SqlQueryResponse, + SqlQueryResponse, }; use common_types::{ datum::{Datum, DatumKind}, record_batch::RecordBatch, - request_id::RequestId, }; -use common_util::{error::BoxError, time::InstantExt}; -use http::StatusCode; +use common_util::error::BoxError; use interpreters::interpreter::Output; -use log::info; use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; -use query_frontend::{ - frontend, - frontend::{Context as SqlContext, Frontend}, - provider::CatalogMetaProvider, -}; use serde::{ ser::{SerializeMap, SerializeSeq}, Deserialize, Serialize, }; -use snafu::{ensure, OptionExt, ResultExt}; +use snafu::{OptionExt, ResultExt}; use crate::{ context::RequestContext, - error::{ErrNoCause, ErrWithCause, Internal, InternalNoCause, Result}, - forward::ForwardResult, - Proxy, + error::{Internal, InternalNoCause, Result}, + read::SqlResponse, + Context, Proxy, }; impl Proxy { @@ -41,106 +33,15 @@ impl Proxy { ctx: &RequestContext, req: Request, ) -> Result { - let request_id = RequestId::next_id(); - let begin_instant = Instant::now(); - let deadline = ctx.timeout.map(|t| begin_instant + t); - - info!( - "Query handler try to process request, request_id:{}, request:{:?}", - request_id, req - ); - - // TODO(yingwen): Privilege check, cannot access data of other tenant - // TODO(yingwen): Maybe move MetaProvider to instance - let provider = CatalogMetaProvider { - manager: self.instance.catalog_manager.clone(), - default_catalog: &ctx.catalog, - default_schema: &ctx.schema, - function_registry: &*self.instance.function_registry, + let context = Context { + timeout: ctx.timeout, + runtime: self.engine_runtimes.read_runtime.clone(), }; - let frontend = Frontend::new(provider); - let mut sql_ctx = SqlContext::new(request_id, deadline); - // Parse sql, frontend error of invalid sql already contains sql - // TODO(yingwen): Maybe move sql from frontend error to outer error - let mut stmts = frontend - .parse_sql(&mut sql_ctx, &req.query) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to parse sql, query:{}", req.query), - })?; - - if stmts.is_empty() { - return Ok(Output::AffectedRows(0)); + match self.handle_sql(context, &ctx.schema, &req.query).await? { + SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp), + SqlResponse::Local(output) => Ok(output), } - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!( - "Only support execute one statement now, current num:{}, query:{}.", - stmts.len(), - req.query - ), - } - ); - - let sql_query_request = SqlQueryRequest { - context: Some(GrpcRequestContext { - database: ctx.schema.clone(), - }), - tables: vec![], - sql: req.query.clone(), - }; - - if let Some(resp) = self.maybe_forward_sql_query(&sql_query_request).await? { - match resp { - ForwardResult::Forwarded(resp) => return convert_sql_response_to_output(resp?), - ForwardResult::Local => (), - } - }; - - // Open partition table if needed. - let table_name = frontend::parse_table_name(&stmts); - if let Some(table_name) = table_name { - self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &table_name) - .await?; - } - - // Create logical plan - // Note: Remember to store sql in error when creating logical plan - let plan = frontend - .statement_to_plan(&mut sql_ctx, stmts.remove(0)) - .box_err() - .with_context(|| ErrWithCause { - code: StatusCode::BAD_REQUEST, - msg: format!("Failed to build plan, query:{}", req.query), - })?; - - self.instance - .limiter - .try_limit(&plan) - .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Query is blocked", - })?; - let output = self - .execute_plan(request_id, &ctx.catalog, &ctx.schema, plan, deadline) - .await?; - - info!( - "Query handler finished, request_id:{}, cost:{}ms, request:{:?}", - request_id, - begin_instant.saturating_elapsed().as_millis(), - req - ); - - Ok(output) } } #[derive(Debug, Deserialize)] diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 5ec760e6b7..0026bc7282 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -18,6 +18,7 @@ pub mod http; pub mod influxdb; pub mod instance; pub mod limiter; +pub(crate) mod read; pub mod schema_config_provider; pub(crate) mod util; pub(crate) mod write; @@ -33,15 +34,15 @@ use catalog::schema::{ }; use ceresdbproto::storage::{ storage_service_client::StorageServiceClient, PrometheusRemoteQueryRequest, - PrometheusRemoteQueryResponse, Route, RouteRequest, SqlQueryRequest, SqlQueryResponse, + PrometheusRemoteQueryResponse, Route, RouteRequest, }; use common_types::{request_id::RequestId, table::DEFAULT_SHARD_ID}; use common_util::{error::BoxError, runtime::Runtime}; use futures::FutureExt; use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output}; -use log::{error, info, warn}; +use log::{error, info}; use query_engine::executor::Executor as QueryExecutor; -use query_frontend::{frontend, plan::Plan}; +use query_frontend::plan::Plan; use router::{endpoint::Endpoint, Router}; use snafu::{OptionExt, ResultExt}; use table_engine::{ @@ -110,55 +111,6 @@ impl Proxy { self.instance.clone() } - async fn maybe_forward_sql_query( - &self, - req: &SqlQueryRequest, - ) -> Result>> { - let table_name = frontend::parse_table_name_with_sql(&req.sql) - .box_err() - .with_context(|| Internal { - msg: format!("Failed to parse table name with sql, sql:{}", req.sql), - })?; - if table_name.is_none() { - warn!("Unable to forward sql query without table name, req:{req:?}",); - return Ok(None); - } - - let req_ctx = req.context.as_ref().unwrap(); - let forward_req = ForwardRequest { - schema: req_ctx.database.clone(), - table: table_name.unwrap(), - req: req.clone().into_request(), - }; - let do_query = |mut client: StorageServiceClient, - request: tonic::Request, - _: &Endpoint| { - let query = async move { - client - .sql_query(request) - .await - .map(|resp| resp.into_inner()) - .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, - msg: "Forwarded sql query failed", - }) - } - .boxed(); - - Box::new(query) as _ - }; - - let forward_result = self.forwarder.forward(forward_req, do_query).await; - Ok(match forward_result { - Ok(forward_res) => Some(forward_res), - Err(e) => { - error!("Failed to forward sql req but the error is ignored, err:{e}"); - None - } - }) - } - async fn maybe_forward_prom_remote_query( &self, metric: String, @@ -393,8 +345,7 @@ impl Proxy { .limiter .try_limit(&plan) .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, + .context(Internal { msg: "Request is blocked", })?; @@ -411,8 +362,7 @@ impl Proxy { let interpreter = interpreter_factory .create(interpreter_ctx, plan) .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, + .context(Internal { msg: "Failed to create interpreter", })?; @@ -423,19 +373,16 @@ impl Proxy { ) .await .box_err() - .context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, + .context(Internal { msg: "Plan execution timeout", }) .and_then(|v| { - v.box_err().context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, + v.box_err().context(Internal { msg: "Failed to execute interpreter", }) }) } else { - interpreter.execute().await.box_err().context(ErrWithCause { - code: StatusCode::INTERNAL_SERVER_ERROR, + interpreter.execute().await.box_err().context(Internal { msg: "Failed to execute interpreter", }) } diff --git a/proxy/src/read.rs b/proxy/src/read.rs new file mode 100644 index 0000000000..a2f0150df4 --- /dev/null +++ b/proxy/src/read.rs @@ -0,0 +1,201 @@ +// Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. + +use std::time::Instant; + +use ceresdbproto::storage::{ + storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse, +}; +use common_types::request_id::RequestId; +use common_util::{error::BoxError, time::InstantExt}; +use futures::FutureExt; +use http::StatusCode; +use interpreters::interpreter::Output; +use log::{error, info, warn}; +use query_engine::executor::Executor as QueryExecutor; +use query_frontend::{ + frontend, + frontend::{Context as SqlContext, Frontend}, + provider::CatalogMetaProvider, +}; +use router::endpoint::Endpoint; +use snafu::{ensure, ResultExt}; +use tonic::{transport::Channel, IntoRequest}; + +use crate::{ + error::{ErrNoCause, ErrWithCause, Error, Internal, Result}, + forward::{ForwardRequest, ForwardResult}, + Context, Proxy, +}; + +pub enum SqlResponse { + Forwarded(SqlQueryResponse), + Local(Output), +} + +impl Proxy { + pub(crate) async fn handle_sql( + &self, + ctx: Context, + schema: &str, + sql: &str, + ) -> Result { + if let Some(resp) = self.maybe_forward_sql_query(schema, sql).await? { + match resp { + ForwardResult::Forwarded(resp) => return Ok(SqlResponse::Forwarded(resp?)), + ForwardResult::Local => (), + } + }; + + Ok(SqlResponse::Local( + self.fetch_sql_query_output(ctx, schema, sql).await?, + )) + } + + pub(crate) async fn fetch_sql_query_output( + &self, + ctx: Context, + schema: &str, + sql: &str, + ) -> Result { + let request_id = RequestId::next_id(); + let begin_instant = Instant::now(); + let deadline = ctx.timeout.map(|t| begin_instant + t); + let catalog = self.instance.catalog_manager.default_catalog_name(); + + info!("Handle sql query, request_id:{request_id}, schema:{schema}, sql:{sql}"); + + let instance = &self.instance; + // TODO(yingwen): Privilege check, cannot access data of other tenant + // TODO(yingwen): Maybe move MetaProvider to instance + let provider = CatalogMetaProvider { + manager: instance.catalog_manager.clone(), + default_catalog: catalog, + default_schema: schema, + function_registry: &*instance.function_registry, + }; + let frontend = Frontend::new(provider); + + let mut sql_ctx = SqlContext::new(request_id, deadline); + // Parse sql, frontend error of invalid sql already contains sql + // TODO(yingwen): Maybe move sql from frontend error to outer error + let mut stmts = frontend + .parse_sql(&mut sql_ctx, sql) + .box_err() + .context(ErrWithCause { + code: StatusCode::BAD_REQUEST, + msg: "Failed to parse sql", + })?; + + ensure!( + !stmts.is_empty(), + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!("No valid query statement provided, sql:{sql}",), + } + ); + + // TODO(yingwen): For simplicity, we only support executing one statement now + // TODO(yingwen): INSERT/UPDATE/DELETE can be batched + ensure!( + stmts.len() == 1, + ErrNoCause { + code: StatusCode::BAD_REQUEST, + msg: format!( + "Only support execute one statement now, current num:{}, sql:{}", + stmts.len(), + sql + ), + } + ); + + // Open partition table if needed. + let table_name = frontend::parse_table_name(&stmts); + if let Some(table_name) = table_name { + self.maybe_open_partition_table_if_not_exist(catalog, schema, &table_name) + .await?; + } + + // Create logical plan + // Note: Remember to store sql in error when creating logical plan + let plan = frontend + // TODO(yingwen): Check error, some error may indicate that the sql is invalid. Now we + // return internal server error in those cases + .statement_to_plan(&mut sql_ctx, stmts.remove(0)) + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("Failed to create plan, query:{sql}"), + })?; + + let output = self + .execute_plan(request_id, catalog, schema, plan, deadline) + .await + .box_err() + .with_context(|| ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: format!("sql:{sql}"), + })?; + + let cost = begin_instant.saturating_elapsed(); + info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, sql:{sql:?}"); + + Ok(output) + } + + async fn maybe_forward_sql_query( + &self, + schema: &str, + sql: &str, + ) -> Result>> { + let table_name = frontend::parse_table_name_with_sql(sql) + .box_err() + .with_context(|| Internal { + msg: format!("Failed to parse table name with sql, sql:{sql}"), + })?; + if table_name.is_none() { + warn!("Unable to forward sql query without table name, sql:{sql}",); + return Ok(None); + } + + let sql_request = SqlQueryRequest { + context: Some(RequestContext { + database: schema.to_string(), + }), + tables: vec![], + sql: sql.to_string(), + }; + + let forward_req = ForwardRequest { + schema: schema.to_string(), + table: table_name.unwrap(), + req: sql_request.into_request(), + }; + let do_query = |mut client: StorageServiceClient, + request: tonic::Request, + _: &Endpoint| { + let query = async move { + client + .sql_query(request) + .await + .map(|resp| resp.into_inner()) + .box_err() + .context(ErrWithCause { + code: StatusCode::INTERNAL_SERVER_ERROR, + msg: "Forwarded sql query failed", + }) + } + .boxed(); + + Box::new(query) as _ + }; + + let forward_result = self.forwarder.forward(forward_req, do_query).await; + Ok(match forward_result { + Ok(forward_res) => Some(forward_res), + Err(e) => { + error!("Failed to forward sql req but the error is ignored, err:{e}"); + None + } + }) + } +} diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index 156ba2482e..aec38b0f0f 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -229,19 +229,10 @@ impl StorageServiceImpl { runtime: self.runtimes.read_runtime.clone(), timeout: self.timeout, }; - let join_handle = self.runtimes.read_runtime.spawn(async move { - if req.context.is_none() { - return SqlQueryResponse { - header: Some(error::build_err_header( - StatusCode::BAD_REQUEST.as_u16() as u32, - "database is not set".to_string(), - )), - ..Default::default() - }; - } - - proxy.handle_sql_query(ctx, req).await - }); + let join_handle = self + .runtimes + .read_runtime + .spawn(async move { proxy.handle_sql_query(ctx, req).await }); let resp = match join_handle.await { Ok(v) => v, @@ -407,19 +398,6 @@ impl StorageServiceImpl { let runtime = ctx.runtime.clone(); let join_handle = runtime.spawn(async move { - if query_req.context.is_none() { - return stream::once(async move { - Ok(SqlQueryResponse { - header: Some(error::build_err_header( - StatusCode::BAD_REQUEST.as_u16() as u32, - "database is not set".to_string(), - )), - ..Default::default() - }) - }) - .boxed(); - } - proxy .handle_stream_sql_query(ctx, query_req) .await From 39399683615b15efe81eccebfa666a45f95dc764 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Wed, 17 May 2023 11:12:04 +0800 Subject: [PATCH 2/2] refactor by CR --- .../cases/env/cluster/ddl/create_tables.result | 8 ++++---- .../cases/env/cluster/ddl/partition_table.result | 6 +++--- .../cases/env/local/ddl/create_tables.result | 6 +++--- proxy/src/read.rs | 4 +++- proxy/src/write.rs | 2 ++ 5 files changed, 15 insertions(+), 11 deletions(-) diff --git a/integration_tests/cases/env/cluster/ddl/create_tables.result b/integration_tests/cases/env/cluster/ddl/create_tables.result index 5fb40c5746..846243300b 100644 --- a/integration_tests/cases/env/cluster/ddl/create_tables.result +++ b/integration_tests/cases/env/cluster/ddl/create_tables.result @@ -44,11 +44,11 @@ affected_rows: 0 CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE IF NOT EXISTS `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: true, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to create table, msg:failed to create table by meta client, req:CreateTableRequest { schema_name: \"public\", name: \"05_create_tables_t2\", engine: \"Analytic\", create_if_not_exist: false, options: {}, partition_table_info: None }, err:Missing table info, msg:created table is not found in the create table response." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/integration_tests/cases/env/cluster/ddl/partition_table.result b/integration_tests/cases/env/cluster/ddl/partition_table.result index 6076eed22d..fba53a2351 100644 --- a/integration_tests/cases/env/cluster/ddl/partition_table.result +++ b/integration_tests/cases/env/cluster/ddl/partition_table.result @@ -4,7 +4,7 @@ affected_rows: 0 DROP TABLE IF EXISTS `__partition_table_t_0`; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: DROP TABLE IF EXISTS `__partition_table_t_0`;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: DROP TABLE IF EXISTS `__partition_table_t_0`;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) CREATE TABLE `partition_table_t`( `name`string TAG, @@ -24,7 +24,7 @@ String("partition_table_t"),String("CREATE TABLE `partition_table_t` (`tsid` uin SHOW CREATE TABLE __partition_table_t_0; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: SHOW CREATE TABLE __partition_table_t_0;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: SHOW CREATE TABLE __partition_table_t_0;. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) INSERT INTO partition_table_t (t, name, value) VALUES (1651737067000, "ceresdb0", 100), @@ -43,7 +43,7 @@ affected_rows: 11 SELECT * from __partition_table_t_0 where name = "ceresdb0"; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: SELECT * from __partition_table_t_0 where name = \"ceresdb0\";. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: SELECT * from __partition_table_t_0 where name = \"ceresdb0\";. Caused by: Internal error, msg:Failed to create interpreter, err:Failed to check permission, msg:only can process sub tables in table partition directly when enable partition table access" }) SELECT * from partition_table_t where name = "ceresdb0"; diff --git a/integration_tests/cases/env/local/ddl/create_tables.result b/integration_tests/cases/env/local/ddl/create_tables.result index cd5fa6118c..0a8e5e0911 100644 --- a/integration_tests/cases/env/local/ddl/create_tables.result +++ b/integration_tests/cases/env/local/ddl/create_tables.result @@ -48,7 +48,7 @@ affected_rows: 0 CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: CREATE TABLE `05_create_tables_t`(c1 int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"c1\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"c1\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t." }) create table `05_create_tables_t2`(a int, b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic with (enable_ttl='false'); @@ -67,11 +67,11 @@ Int32(4), create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; -Failed to execute query, err: Server(ServerError { code: 500, msg: "sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) +Failed to execute query, err: Server(ServerError { code: 500, msg: "Failed to execute plan, sql: create table `05_create_tables_t2`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic;. Caused by: Internal error, msg:Failed to execute interpreter, err:Failed to execute create table, err:Failed to create table by table manipulator, err:Failed to operate table, err:Failed to operate table, msg:failed to create table on shard, request:CreateTableRequest { catalog_name: \"ceresdb\", schema_name: \"public\", table_name: \"05_create_tables_t2\", table_id: None, table_schema: Schema { timestamp_index: 1, tsid_index: Some(0), column_schemas: ColumnSchemas { columns: [ColumnSchema { id: 1, name: \"tsid\", data_type: UInt64, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"tsid\", default_value: None }, ColumnSchema { id: 2, name: \"t\", data_type: Timestamp, is_nullable: false, is_tag: false, comment: \"\", escaped_name: \"t\", default_value: None }, ColumnSchema { id: 3, name: \"a\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"a\", default_value: None }, ColumnSchema { id: 4, name: \"b\", data_type: Int32, is_nullable: true, is_tag: false, comment: \"\", escaped_name: \"b\", default_value: None }] }, version: 1, primary_key_indexes: [0, 1] }, engine: \"Analytic\", options: {}, state: Stable, shard_id: 0, partition_info: None }, err:Failed to create table, table already exists, table:05_create_tables_t2." }) create table `05_create_tables_t3`(a int,b int, t timestamp NOT NULL, TIMESTAMP KEY(t)) ENGINE = Analytic; diff --git a/proxy/src/read.rs b/proxy/src/read.rs index a2f0150df4..e9fcf7d86e 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -1,5 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +//! Contains common methods used by the read process. + use std::time::Instant; use ceresdbproto::storage::{ @@ -133,7 +135,7 @@ impl Proxy { .box_err() .with_context(|| ErrWithCause { code: StatusCode::INTERNAL_SERVER_ERROR, - msg: format!("sql:{sql}"), + msg: format!("Failed to execute plan, sql:{sql}"), })?; let cost = begin_instant.saturating_elapsed(); diff --git a/proxy/src/write.rs b/proxy/src/write.rs index 1aae990e18..989a5ccc6f 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -1,5 +1,7 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +//! Contains common methods used by the write process. + use std::{ cmp::max, collections::{BTreeMap, HashMap},