From 17e06fec0b4707d3f293edac6bb31bc6e752b1aa Mon Sep 17 00:00:00 2001 From: baojinri Date: Tue, 29 Aug 2023 16:41:26 +0800 Subject: [PATCH] refactor mysql & pgsql sql query --- proxy/src/grpc/sql_query.rs | 4 ++-- proxy/src/http/sql.rs | 2 +- proxy/src/lib.rs | 15 +++++++------- proxy/src/read.rs | 2 +- server/src/config.rs | 6 +++--- server/src/mysql/worker.rs | 34 +++++++++++++++++++++++++------- server/src/postgresql/handler.rs | 28 +++++++++++++++++++------- server/src/server.rs | 2 +- 8 files changed, 64 insertions(+), 29 deletions(-) diff --git a/proxy/src/grpc/sql_query.rs b/proxy/src/grpc/sql_query.rs index 8584531cfc..87816d7907 100644 --- a/proxy/src/grpc/sql_query.rs +++ b/proxy/src/grpc/sql_query.rs @@ -87,7 +87,7 @@ impl Proxy { ctx, schema, &req.sql, - self.access_partition_table.enable_other, + self.access_partition_table.enable_others, ) .await? { @@ -150,7 +150,7 @@ impl Proxy { ctx, schema, &req.sql, - self.access_partition_table.enable_other, + self.access_partition_table.enable_others, ) .await?; diff --git a/proxy/src/http/sql.rs b/proxy/src/http/sql.rs index aa24fa92ca..7e31512bf3 100644 --- a/proxy/src/http/sql.rs +++ b/proxy/src/http/sql.rs @@ -181,7 +181,7 @@ fn convert_records(records: RecordBatchVec) -> Response { }) } -fn convert_sql_response_to_output(sql_query_response: SqlQueryResponse) -> Result { +pub fn convert_sql_response_to_output(sql_query_response: SqlQueryResponse) -> Result { if let Some(header) = sql_query_response.header { if header.code as u16 != StatusCode::OK.as_u16() { return ErrNoCause { diff --git a/proxy/src/lib.rs b/proxy/src/lib.rs index 8eb70820f9..8c865d2b91 100644 --- a/proxy/src/lib.rs +++ b/proxy/src/lib.rs @@ -31,7 +31,7 @@ pub mod instance; pub mod limiter; mod metrics; pub mod opentsdb; -mod read; +pub mod read; pub mod schema_config_provider; mod util; mod write; @@ -94,16 +94,17 @@ use crate::{ const QUERY_EXPIRED_BUFFER: Duration = Duration::from_secs(60 * 60); #[derive(Clone, Debug, Deserialize, Serialize)] -pub struct AccessPartitionTableConfig { +pub struct SubTableAccessPerm { pub enable_http: bool, - pub enable_other: bool, + // Enable protocols like grpc, mysql, pgsql to access sub table. + pub enable_others: bool, } -impl Default for AccessPartitionTableConfig { +impl Default for SubTableAccessPerm { fn default() -> Self { Self { enable_http: true, - enable_other: false, + enable_others: false, } } } @@ -118,7 +119,7 @@ pub struct Proxy { hotspot_recorder: Arc, engine_runtimes: Arc, cluster_with_meta: bool, - access_partition_table: AccessPartitionTableConfig, + pub access_partition_table: SubTableAccessPerm, } impl Proxy { @@ -134,7 +135,7 @@ impl Proxy { hotspot_recorder: Arc, engine_runtimes: Arc, cluster_with_meta: bool, - access_partition_table: AccessPartitionTableConfig, + access_partition_table: SubTableAccessPerm, ) -> Self { let forwarder = Arc::new(Forwarder::new( forward_config, diff --git a/proxy/src/read.rs b/proxy/src/read.rs index a9b1750ea3..81648c030e 100644 --- a/proxy/src/read.rs +++ b/proxy/src/read.rs @@ -46,7 +46,7 @@ pub enum SqlResponse { } impl Proxy { - pub(crate) async fn handle_sql( + pub async fn handle_sql( &self, ctx: &Context, schema: &str, diff --git a/server/src/config.rs b/server/src/config.rs index 793109a88b..ca592639db 100644 --- a/server/src/config.rs +++ b/server/src/config.rs @@ -19,7 +19,7 @@ use std::collections::HashMap; use cluster::config::SchemaConfig; use common_types::schema::TIMESTAMP_COLUMN; use meta_client::types::ShardId; -use proxy::{forward, hotspot, AccessPartitionTableConfig}; +use proxy::{forward, hotspot, SubTableAccessPerm}; use router::{ endpoint::Endpoint, rule_based::{ClusterView, RuleList}, @@ -139,7 +139,7 @@ pub struct ServerConfig { pub enable_query_dedup: bool, /// Whether enable to access partition table - pub enable_partition_table_access: AccessPartitionTableConfig, + pub sub_table_access_perm: SubTableAccessPerm, } impl Default for ServerConfig { @@ -161,7 +161,7 @@ impl Default for ServerConfig { hotspot: hotspot::Config::default(), remote_client: remote_engine_client::Config::default(), enable_query_dedup: false, - enable_partition_table_access: AccessPartitionTableConfig::default(), + sub_table_access_perm: SubTableAccessPerm::default(), } } } diff --git a/server/src/mysql/worker.rs b/server/src/mysql/worker.rs index 3f36fe776d..2d38ff82eb 100644 --- a/server/src/mysql/worker.rs +++ b/server/src/mysql/worker.rs @@ -18,7 +18,10 @@ use generic_error::BoxError; use interpreters::interpreter::Output; use log::{error, info}; use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter}; -use proxy::{context::RequestContext, http::sql::Request, Proxy}; +use proxy::{ + context::RequestContext, http::sql::convert_sql_response_to_output, read::SqlResponse, Context, + Proxy, +}; use snafu::ResultExt; use crate::mysql::{ @@ -107,11 +110,17 @@ where { async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result { let ctx = self.create_ctx()?; - let req = Request { - query: sql.to_string(), - }; - self.proxy - .handle_http_sql_query(&ctx, req) + let schema = &ctx.schema; + let ctx = Context::new(ctx.timeout, None); + + match self + .proxy + .handle_sql( + &ctx, + schema, + sql, + self.proxy.access_partition_table.enable_others, + ) .await .map_err(|e| { error!("Mysql service Failed to handle sql, err: {}", e); @@ -120,7 +129,18 @@ where .box_err() .context(HandleSql { sql: sql.to_string(), - }) + })? { + SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp), + SqlResponse::Local(output) => Ok(output), + } + .map_err(|e| { + error!("Mysql service Failed to handle sql, err: {}", e); + e + }) + .box_err() + .context(HandleSql { + sql: sql.to_string(), + }) } fn create_ctx(&self) -> Result { diff --git a/server/src/postgresql/handler.rs b/server/src/postgresql/handler.rs index 8743c9e7e2..ed643cab0d 100644 --- a/server/src/postgresql/handler.rs +++ b/server/src/postgresql/handler.rs @@ -28,7 +28,10 @@ use pgwire::{ }, error::{PgWireError, PgWireResult}, }; -use proxy::{context::RequestContext, http::sql::Request, Proxy}; +use proxy::{ + context::RequestContext, http::sql::convert_sql_response_to_output, read::SqlResponse, Context, + Proxy, +}; use snafu::ResultExt; use crate::postgresql::error::{CreateContext, Result}; @@ -46,18 +49,29 @@ impl SimpleQueryHandler for PostgresqlHandler { let ctx = self .create_ctx() .map_err(|e| PgWireError::ApiError(Box::new(e)))?; + let schema = &ctx.schema; + let ctx = Context::new(ctx.timeout, None); - let req = Request { - query: sql.to_string(), - }; - let results = self + let results = match self .proxy - .handle_http_sql_query(&ctx, req) + .handle_sql( + &ctx, + schema, + sql, + self.proxy.access_partition_table.enable_others, + ) .await .map_err(|e| { error!("PostgreSQL service Failed to handle sql, err: {}", e); PgWireError::ApiError(Box::new(e)) - })?; + })? { + SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp), + SqlResponse::Local(output) => Ok(output), + } + .map_err(|e| { + error!("PostgreSQL service Failed to handle sql, err: {}", e); + PgWireError::ApiError(Box::new(e)) + })?; Ok(vec![into_pg_reponse(results)?]) } diff --git a/server/src/server.rs b/server/src/server.rs index c959f78ba6..8ceb9caa06 100644 --- a/server/src/server.rs +++ b/server/src/server.rs @@ -401,7 +401,7 @@ impl Builder { hotspot_recorder.clone(), engine_runtimes.clone(), self.cluster.is_some(), - self.server_config.enable_partition_table_access, + self.server_config.sub_table_access_perm, )); let http_service = http::Builder::new(http_config)