diff --git a/proxy/src/context.rs b/proxy/src/context.rs index 94e23749d1..80152de49d 100644 --- a/proxy/src/context.rs +++ b/proxy/src/context.rs @@ -16,6 +16,7 @@ use std::time::Duration; +use common_types::request_id::RequestId; use macros::define_result; use snafu::{ensure, Backtrace, Snafu}; @@ -42,15 +43,16 @@ define_result!(Error); /// Context for request, may contains /// 1. Request context and options /// 2. Info from http headers +#[derive(Debug)] pub struct RequestContext { /// Catalog of the request pub catalog: String, /// Schema of request pub schema: String, - /// Enable partition table_access flag - pub enable_partition_table_access: bool, /// Request timeout pub timeout: Option, + /// Request id + pub request_id: RequestId, } impl RequestContext { @@ -63,7 +65,6 @@ impl RequestContext { pub struct Builder { catalog: String, schema: String, - enable_partition_table_access: bool, timeout: Option, } @@ -78,11 +79,6 @@ impl Builder { self } - pub fn enable_partition_table_access(mut self, enable_partition_table_access: bool) -> Self { - self.enable_partition_table_access = enable_partition_table_access; - self - } - pub fn timeout(mut self, timeout: Option) -> Self { self.timeout = timeout; self @@ -95,8 +91,8 @@ impl Builder { Ok(RequestContext { catalog: self.catalog, schema: self.schema, - enable_partition_table_access: self.enable_partition_table_access, timeout: self.timeout, + request_id: RequestId::next_id(), }) } } diff --git a/proxy/src/grpc/prom_query.rs b/proxy/src/grpc/prom_query.rs index 09fdc3d547..9946c59898 100644 --- a/proxy/src/grpc/prom_query.rs +++ b/proxy/src/grpc/prom_query.rs @@ -26,7 +26,6 @@ use ceresdbproto::{ use common_types::{ datum::DatumKind, record_batch::RecordBatch, - request_id::RequestId, schema::{RecordSchema, TSID_COLUMN}, }; use generic_error::BoxError; @@ -76,7 +75,7 @@ impl Proxy { ctx: Context, req: PrometheusQueryRequest, ) -> Result { - let request_id = RequestId::next_id(); + let request_id = ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let req_ctx = req.context.context(ErrNoCause { diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index 9f22572838..43cad296dc 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -33,8 +33,6 @@ use log::{error, warn}; use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use router::endpoint::Endpoint; use snafu::ResultExt; -use tokio::sync::mpsc; -use tokio_stream::wrappers::ReceiverStream; use tonic::{transport::Channel, IntoRequest}; use crate::{ @@ -45,8 +43,6 @@ use crate::{ Context, Proxy, }; -const STREAM_QUERY_CHANNEL_LEN: usize = 20; - impl Proxy { pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse { // Incoming query maybe larger than query_failed + query_succeeded for some @@ -55,10 +51,11 @@ impl Proxy { GRPC_HANDLER_COUNTER_VEC.incoming_query.inc(); self.hotspot_recorder.inc_sql_query_reqs(&req).await; - match self.handle_sql_query_internal(ctx, req).await { + match self.handle_sql_query_internal(&ctx, &req).await { Err(e) => { + error!("Failed to handle sql query, ctx:{ctx:?}, err:{e}"); + GRPC_HANDLER_COUNTER_VEC.query_failed.inc(); - error!("Failed to handle sql query, err:{e}"); SqlQueryResponse { header: Some(error::build_err_header(e)), ..Default::default() @@ -73,8 +70,8 @@ impl Proxy { async fn handle_sql_query_internal( &self, - ctx: Context, - req: SqlQueryRequest, + ctx: &Context, + req: &SqlQueryRequest, ) -> Result { if req.context.is_none() { return ErrNoCause { @@ -86,7 +83,7 @@ impl Proxy { let req_context = req.context.as_ref().unwrap(); let schema = &req_context.database; - match self.handle_sql(ctx, schema, &req.sql).await? { + match self.handle_sql(ctx, schema, &req.sql, false).await? { SqlResponse::Forwarded(resp) => Ok(resp), SqlResponse::Local(output) => convert_output(&output, self.resp_compress_min_length), } @@ -99,7 +96,7 @@ impl Proxy { ) -> BoxStream<'static, SqlQueryResponse> { GRPC_HANDLER_COUNTER_VEC.stream_query.inc(); self.hotspot_recorder.inc_sql_query_reqs(&req).await; - match self.clone().handle_stream_query_internal(ctx, req).await { + match self.clone().handle_stream_query_internal(&ctx, &req).await { Err(e) => stream::once(async { error!("Failed to handle stream sql query, err:{e}"); GRPC_HANDLER_COUNTER_VEC.stream_query_failed.inc(); @@ -118,8 +115,8 @@ impl Proxy { async fn handle_stream_query_internal( self: Arc, - ctx: Context, - req: SqlQueryRequest, + ctx: &Context, + req: &SqlQueryRequest, ) -> Result> { if req.context.is_none() { return ErrNoCause { @@ -130,12 +127,8 @@ impl Proxy { } let req_context = req.context.as_ref().unwrap(); - let schema = req_context.database.clone(); - let req = match self - .clone() - .maybe_forward_stream_sql_query(ctx.clone(), &req) - .await - { + let schema = &req_context.database; + let req = match self.clone().maybe_forward_stream_sql_query(ctx, req).await { Some(resp) => match resp { ForwardResult::Forwarded(resp) => return resp, ForwardResult::Local => req, @@ -143,53 +136,47 @@ impl Proxy { None => req, }; - let (tx, rx) = mpsc::channel(STREAM_QUERY_CHANNEL_LEN); - let runtime = ctx.runtime.clone(); let resp_compress_min_length = self.resp_compress_min_length; let output = self .as_ref() - .fetch_sql_query_output(ctx, &schema, &req.sql) + .fetch_sql_query_output(ctx, schema, &req.sql, false) .await?; - runtime.spawn(async move { - match output { - Output::AffectedRows(rows) => { - let resp = - QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows); - if tx.send(resp).await.is_err() { - error!("Failed to send affected rows resp in stream sql query"); - } - GRPC_HANDLER_COUNTER_VEC - .query_affected_row - .inc_by(rows as u64); - } - Output::Records(batches) => { - let mut num_rows = 0; - for batch in &batches { - let resp = { - let mut writer = QueryResponseWriter::new(resp_compress_min_length); - writer.write(batch)?; - writer.finish() - }?; - - if tx.send(resp).await.is_err() { - error!("Failed to send record batches resp in stream sql query"); - break; - } - num_rows += batch.num_rows(); - } - GRPC_HANDLER_COUNTER_VEC - .query_succeeded_row - .inc_by(num_rows as u64); + + match output { + Output::AffectedRows(rows) => { + GRPC_HANDLER_COUNTER_VEC + .query_affected_row + .inc_by(rows as u64); + + let resp = QueryResponseBuilder::with_ok_header().build_with_affected_rows(rows); + + Ok(Box::pin(stream::once(async { resp }))) + } + Output::Records(batches) => { + let mut num_rows = 0; + let mut results = Vec::with_capacity(batches.len()); + for batch in &batches { + let resp = { + let mut writer = QueryResponseWriter::new(resp_compress_min_length); + writer.write(batch)?; + writer.finish() + }?; + results.push(resp); + num_rows += batch.num_rows(); } + + GRPC_HANDLER_COUNTER_VEC + .query_succeeded_row + .inc_by(num_rows as u64); + + Ok(Box::pin(stream::iter(results))) } - Ok::<(), Error>(()) - }); - Ok(ReceiverStream::new(rx).boxed()) + } } async fn maybe_forward_stream_sql_query( self: Arc, - ctx: Context, + ctx: &Context, req: &SqlQueryRequest, ) -> Option, Error>> { if req.tables.len() != 1 { @@ -203,7 +190,7 @@ impl Proxy { schema: req_ctx.database.clone(), table: req.tables[0].clone(), req: req.clone().into_request(), - forwarded_from: ctx.forwarded_from, + forwarded_from: ctx.forwarded_from.clone(), }; let do_query = |mut client: StorageServiceClient, request: tonic::Request, diff --git a/proxy/src/http/prom.rs b/proxy/src/http/prom.rs index 6864db4dd5..b40debf214 100644 --- a/proxy/src/http/prom.rs +++ b/proxy/src/http/prom.rs @@ -31,13 +31,12 @@ use ceresdbproto::storage::{ }; use common_types::{ datum::DatumKind, - request_id::RequestId, schema::{RecordSchema, TSID_COLUMN}, }; use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; -use log::{debug, error}; +use log::{error, info}; use prom_remote_api::types::{ Label, LabelMatcher, Query, QueryResult, RemoteStorage, Sample, TimeSeries, WriteRequest, }; @@ -85,12 +84,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let ctx = ProxyContext { - runtime: self.engine_runtimes.write_runtime.clone(), - timeout: ctx.timeout, - enable_partition_table_access: false, - forwarded_from: None, - }; + let ctx = ProxyContext::new(ctx.timeout, None); match self.handle_write_internal(ctx, table_request).await { Ok(result) => { @@ -127,15 +121,14 @@ impl Proxy { metric: String, query: Query, ) -> Result { - // Open partition table if needed. - self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &metric) - .await?; - - let request_id = RequestId::next_id(); + let request_id = ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); + info!("Handle prom remote query begin, ctx:{ctx:?}, metric:{metric}, request:{query:?}"); - debug!("Query handler try to process request, request_id:{request_id}, request:{query:?}"); + // Open partition table if needed. + self.maybe_open_partition_table_if_not_exist(&ctx.catalog, &ctx.schema, &metric) + .await?; let provider = CatalogMetaProvider { manager: self.instance.catalog_manager.clone(), @@ -171,7 +164,7 @@ impl Proxy { .await?; let cost = begin_instant.saturating_elapsed().as_millis(); - debug!("Query handler finished, request_id:{request_id}, cost:{cost}ms, query:{query:?}"); + info!("Handle prom remote query successfully, ctx:{ctx:?}, cost:{cost}ms"); convert_query_result(metric, timestamp_col_name, field_col_name, output) } diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index 9ddfabcdf9..16a3b8439b 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -26,6 +26,7 @@ use common_types::{ use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; +use log::error; use query_engine::{ executor::{Executor as QueryExecutor, RecordBatchVec}, physical_planner::PhysicalPlanner, @@ -49,14 +50,16 @@ impl Proxy { ctx: &RequestContext, req: Request, ) -> Result { - let context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.read_runtime.clone(), - enable_partition_table_access: true, - forwarded_from: None, - }; + let schema = &ctx.schema; + let ctx = Context::new(ctx.timeout, None); - match self.handle_sql(context, &ctx.schema, &req.query).await? { + match self + .handle_sql(&ctx, schema, &req.query, true) + .await + .map_err(|e| { + error!("Handle sql query failed, ctx:{ctx:?}, req:{req:?}, err:{e}"); + e + })? { SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp), SqlResponse::Local(output) => Ok(output), } diff --git a/proxy/src/influxdb/mod.rs b/proxy/src/influxdb/mod.rs index 4bcdd8d788..640ddfcab6 100644 --- a/proxy/src/influxdb/mod.rs +++ b/proxy/src/influxdb/mod.rs @@ -23,7 +23,6 @@ use std::time::Instant; use ceresdbproto::storage::{ RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest, }; -use common_types::request_id::RequestId; use generic_error::BoxError; use http::StatusCode; use interpreters::interpreter::Output; @@ -80,12 +79,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.write_runtime.clone(), - enable_partition_table_access: false, - forwarded_from: None, - }; + let proxy_context = Context::new(ctx.timeout, None); match self .handle_write_internal(proxy_context, table_request) @@ -126,7 +120,7 @@ impl Proxy { ctx: RequestContext, req: InfluxqlRequest, ) -> Result { - let request_id = RequestId::next_id(); + let request_id = ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8a44d097a5..03431f8826 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -72,7 +72,6 @@ use log::{error, info}; use query_engine::{executor::Executor as QueryExecutor, physical_planner::PhysicalPlanner}; use query_frontend::plan::Plan; use router::{endpoint::Endpoint, Router}; -use runtime::Runtime; use snafu::{OptionExt, ResultExt}; use table_engine::{ engine::{EngineRuntimes, TableState}, @@ -554,10 +553,19 @@ impl Proxy { } } -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Context { - pub timeout: Option, - pub runtime: Arc, - pub enable_partition_table_access: bool, - pub forwarded_from: Option, + request_id: RequestId, + timeout: Option, + forwarded_from: Option, +} + +impl Context { + pub fn new(timeout: Option, forwarded_from: Option) -> Self { + Self { + request_id: RequestId::next_id(), + timeout, + forwarded_from, + } + } } diff --git a/proxy/src/opentsdb/mod.rs b/proxy/src/opentsdb/mod.rs index 14849f7758..bfbc6819e7 100644 --- a/proxy/src/opentsdb/mod.rs +++ b/proxy/src/opentsdb/mod.rs @@ -56,12 +56,7 @@ impl Proxy { }), table_requests: write_table_requests, }; - let proxy_context = Context { - timeout: ctx.timeout, - runtime: self.engine_runtimes.write_runtime.clone(), - enable_partition_table_access: false, - forwarded_from: None, - }; + let proxy_context = Context::new(ctx.timeout, None); match self .handle_write_internal(proxy_context, table_request) diff --git a/proxy/src/read.rs b/proxy/src/read.rs index c848609749..e53629a3d9 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -19,7 +19,6 @@ use std::time::Instant; use ceresdbproto::storage::{ storage_service_client::StorageServiceClient, RequestContext, SqlQueryRequest, SqlQueryResponse, }; -use common_types::request_id::RequestId; use futures::FutureExt; use generic_error::BoxError; use http::StatusCode; @@ -50,9 +49,10 @@ pub enum SqlResponse { impl Proxy { pub(crate) async fn handle_sql( &self, - ctx: Context, + ctx: &Context, schema: &str, sql: &str, + enable_partition_table_access: bool, ) -> Result { if let Some(resp) = self .maybe_forward_sql_query(ctx.clone(), schema, sql) @@ -65,22 +65,25 @@ impl Proxy { }; Ok(SqlResponse::Local( - self.fetch_sql_query_output(ctx, schema, sql).await?, + self.fetch_sql_query_output(ctx, schema, sql, enable_partition_table_access) + .await?, )) } pub(crate) async fn fetch_sql_query_output( &self, - ctx: Context, + ctx: &Context, + // TODO: maybe we can put params below input a new ReadRequest struct. schema: &str, sql: &str, + enable_partition_table_access: bool, ) -> Result { - let request_id = RequestId::next_id(); + let request_id = ctx.request_id; let begin_instant = Instant::now(); let deadline = ctx.timeout.map(|t| begin_instant + t); let catalog = self.instance.catalog_manager.default_catalog_name(); - info!("Handle sql query, request_id:{request_id}, deadline:{deadline:?}, schema:{schema}, sql:{sql}"); + info!("Handle sql query begin, catalog:{catalog}, schema:{schema}, deadline:{deadline:?}, ctx:{ctx:?}, sql:{sql}"); let instance = &self.instance; // TODO(yingwen): Privilege check, cannot access data of other tenant @@ -104,24 +107,14 @@ impl Proxy { msg: "Failed to parse sql", })?; + // TODO: For simplicity, we only support executing one statement + let stmts_len = stmts.len(); ensure!( - !stmts.is_empty(), - ErrNoCause { - code: StatusCode::BAD_REQUEST, - msg: format!("No valid query statement provided, sql:{sql}",), - } - ); - - // TODO(yingwen): For simplicity, we only support executing one statement now - // TODO(yingwen): INSERT/UPDATE/DELETE can be batched - ensure!( - stmts.len() == 1, + stmts_len == 1, ErrNoCause { code: StatusCode::BAD_REQUEST, msg: format!( - "Only support execute one statement now, current num:{}, sql:{}", - stmts.len(), - sql + "Only support execute one statement now, current num:{stmts_len}, sql:{sql}" ), } ); @@ -155,7 +148,7 @@ impl Proxy { } } - let output = if ctx.enable_partition_table_access { + let output = if enable_partition_table_access { self.execute_plan_involving_partition_table(request_id, catalog, schema, plan, deadline) .await } else { @@ -168,7 +161,7 @@ impl Proxy { })?; let cost = begin_instant.saturating_elapsed(); - info!("Handle sql query success, catalog:{catalog}, schema:{schema}, request_id:{request_id}, cost:{cost:?}, sql:{sql:?}"); + info!("Handle sql query successfully, catalog:{catalog}, schema:{schema}, cost:{cost:?}, ctx:{ctx:?}"); match &output { Output::AffectedRows(_) => Ok(output), diff --git a/proxy/src/write.rs b/proxy/src/write.rs index e65e324ba0..c445fb75f5 100644 --- a/proxy/src/write.rs +++ b/proxy/src/write.rs @@ -103,8 +103,7 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { - let request_id = RequestId::next_id(); - + let request_id = ctx.request_id; let write_context = req.context.clone().context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, @@ -138,8 +137,7 @@ impl Proxy { ctx: Context, req: WriteRequest, ) -> Result { - let request_id = RequestId::next_id(); - + let request_id = ctx.request_id; let write_context = req.context.clone().context(ErrNoCause { msg: "Missing context", code: StatusCode::BAD_REQUEST, diff --git a/server/src/grpc/storage_service/mod.rs b/server/src/grpc/storage_service/mod.rs index d50a80a477..8421b33551 100644 --- a/server/src/grpc/storage_service/mod.rs +++ b/server/src/grpc/storage_service/mod.rs @@ -146,16 +146,9 @@ impl StorageService for StorageS ) -> Result, tonic::Status> { let begin_instant = Instant::now(); let proxy = self.proxy.clone(); - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; - let stream = Self::stream_sql_query_internal(ctx, proxy, req).await; + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + + let stream = self.stream_sql_query_internal(ctx, proxy, req).await; GRPC_HANDLER_DURATION_HISTOGRAM_VEC .handle_stream_sql_query @@ -165,21 +158,19 @@ impl StorageService for StorageS } } +fn get_forwarded_from(req: &tonic::Request) -> Option { + req.metadata() + .get(FORWARDED_FROM) + .map(|value| value.to_str().unwrap().to_string()) +} + // TODO: Use macros to simplify duplicate code impl StorageServiceImpl { async fn route_internal( &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -206,15 +197,8 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.write_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -250,22 +234,13 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; - let req = req.into_inner(); + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); let proxy = self.proxy.clone(); let join_handle = self .runtimes .read_runtime - .spawn(async move { proxy.handle_sql_query(ctx, req).await }); + .spawn(async move { proxy.handle_sql_query(ctx, req.into_inner()).await }); let resp = match join_handle.await { Ok(v) => v, @@ -318,15 +293,8 @@ impl StorageServiceImpl { &self, req: tonic::Request, ) -> Result, tonic::Status> { - let ctx = Context { - runtime: self.runtimes.read_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); + let req = req.into_inner(); let proxy = self.proxy.clone(); @@ -361,20 +329,11 @@ impl StorageServiceImpl { &self, req: tonic::Request>, ) -> Result, tonic::Status> { - let mut total_success = 0; - - let ctx = Context { - runtime: self.runtimes.write_runtime.clone(), - timeout: self.timeout, - enable_partition_table_access: false, - forwarded_from: req - .metadata() - .get(FORWARDED_FROM) - .map(|value| value.to_str().unwrap().to_string()), - }; + let ctx = Context::new(self.timeout, get_forwarded_from(&req)); let mut stream = req.into_inner(); let proxy = self.proxy.clone(); + let mut total_success = 0; let join_handle = self.runtimes.write_runtime.spawn(async move { let mut resp = WriteResponse::default(); let mut has_err = false; @@ -431,6 +390,7 @@ impl StorageServiceImpl { } async fn stream_sql_query_internal( + &self, ctx: Context, proxy: Arc>, req: tonic::Request, @@ -439,9 +399,7 @@ impl StorageServiceImpl { tonic::Status, > { let query_req = req.into_inner(); - - let runtime = ctx.runtime.clone(); - let join_handle = runtime.spawn(async move { + let join_handle = self.runtimes.read_runtime.spawn(async move { proxy .handle_stream_sql_query(ctx, query_req) .await diff --git a/server/src/http.rs b/server/src/http.rs index e37aa08481..260154c187 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -638,7 +638,6 @@ impl Service { .catalog(catalog.unwrap_or(default_catalog)) .schema(schema) .timeout(timeout) - .enable_partition_table_access(true) .build() .context(CreateContext) .map_err(reject::custom) diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 1646fffa0e..9521c379e4 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -147,7 +147,6 @@ where RequestContext::builder() .catalog(default_catalog) .schema(default_schema) - .enable_partition_table_access(false) .timeout(self.timeout) .build() .context(CreateContext) diff --git a/server/src/postgresql/handler.rs b/server/src/postgresql/handler.rs index 3d442f967c..a965d7a099 100644 --- a/server/src/postgresql/handler.rs +++ b/server/src/postgresql/handler.rs @@ -84,7 +84,6 @@ impl PostgresqlHandler { RequestContext::builder() .catalog(default_catalog) .schema(default_schema) - .enable_partition_table_access(false) .timeout(self.timeout) .build() .context(CreateContext)