Skip to content

Commit

Permalink
refactor mysql & pgsql sql query
Browse files Browse the repository at this point in the history
  • Loading branch information
baojinri committed Aug 29, 2023
1 parent 7518577 commit 17e06fe
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 29 deletions.
4 changes: 2 additions & 2 deletions proxy/src/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Proxy {
ctx,
schema,
&req.sql,
self.access_partition_table.enable_other,
self.access_partition_table.enable_others,
)
.await?
{
Expand Down Expand Up @@ -150,7 +150,7 @@ impl Proxy {
ctx,
schema,
&req.sql,
self.access_partition_table.enable_other,
self.access_partition_table.enable_others,
)
.await?;

Expand Down
2 changes: 1 addition & 1 deletion proxy/src/http/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ fn convert_records(records: RecordBatchVec) -> Response {
})
}

fn convert_sql_response_to_output(sql_query_response: SqlQueryResponse) -> Result<Output> {
pub fn convert_sql_response_to_output(sql_query_response: SqlQueryResponse) -> Result<Output> {
if let Some(header) = sql_query_response.header {
if header.code as u16 != StatusCode::OK.as_u16() {
return ErrNoCause {
Expand Down
15 changes: 8 additions & 7 deletions proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pub mod instance;
pub mod limiter;
mod metrics;
pub mod opentsdb;
mod read;
pub mod read;
pub mod schema_config_provider;
mod util;
mod write;
Expand Down Expand Up @@ -94,16 +94,17 @@ use crate::{
const QUERY_EXPIRED_BUFFER: Duration = Duration::from_secs(60 * 60);

#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct AccessPartitionTableConfig {
pub struct SubTableAccessPerm {
pub enable_http: bool,
pub enable_other: bool,
// Enable protocols like grpc, mysql, pgsql to access sub table.
pub enable_others: bool,
}

impl Default for AccessPartitionTableConfig {
impl Default for SubTableAccessPerm {
fn default() -> Self {
Self {
enable_http: true,
enable_other: false,
enable_others: false,
}
}
}
Expand All @@ -118,7 +119,7 @@ pub struct Proxy {
hotspot_recorder: Arc<HotspotRecorder>,
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
access_partition_table: AccessPartitionTableConfig,
pub access_partition_table: SubTableAccessPerm,
}

impl Proxy {
Expand All @@ -134,7 +135,7 @@ impl Proxy {
hotspot_recorder: Arc<HotspotRecorder>,
engine_runtimes: Arc<EngineRuntimes>,
cluster_with_meta: bool,
access_partition_table: AccessPartitionTableConfig,
access_partition_table: SubTableAccessPerm,
) -> Self {
let forwarder = Arc::new(Forwarder::new(
forward_config,
Expand Down
2 changes: 1 addition & 1 deletion proxy/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub enum SqlResponse {
}

impl Proxy {
pub(crate) async fn handle_sql(
pub async fn handle_sql(
&self,
ctx: &Context,
schema: &str,
Expand Down
6 changes: 3 additions & 3 deletions server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use std::collections::HashMap;
use cluster::config::SchemaConfig;
use common_types::schema::TIMESTAMP_COLUMN;
use meta_client::types::ShardId;
use proxy::{forward, hotspot, AccessPartitionTableConfig};
use proxy::{forward, hotspot, SubTableAccessPerm};
use router::{
endpoint::Endpoint,
rule_based::{ClusterView, RuleList},
Expand Down Expand Up @@ -139,7 +139,7 @@ pub struct ServerConfig {
pub enable_query_dedup: bool,

/// Whether enable to access partition table
pub enable_partition_table_access: AccessPartitionTableConfig,
pub sub_table_access_perm: SubTableAccessPerm,
}

impl Default for ServerConfig {
Expand All @@ -161,7 +161,7 @@ impl Default for ServerConfig {
hotspot: hotspot::Config::default(),
remote_client: remote_engine_client::Config::default(),
enable_query_dedup: false,
enable_partition_table_access: AccessPartitionTableConfig::default(),
sub_table_access_perm: SubTableAccessPerm::default(),
}
}
}
Expand Down
34 changes: 27 additions & 7 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ use generic_error::BoxError;
use interpreters::interpreter::Output;
use log::{error, info};
use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter};
use proxy::{context::RequestContext, http::sql::Request, Proxy};
use proxy::{
context::RequestContext, http::sql::convert_sql_response_to_output, read::SqlResponse, Context,
Proxy,
};
use snafu::ResultExt;

use crate::mysql::{
Expand Down Expand Up @@ -107,11 +110,17 @@ where
{
async fn do_query<'a>(&'a mut self, sql: &'a str) -> Result<Output> {
let ctx = self.create_ctx()?;
let req = Request {
query: sql.to_string(),
};
self.proxy
.handle_http_sql_query(&ctx, req)
let schema = &ctx.schema;
let ctx = Context::new(ctx.timeout, None);

match self
.proxy
.handle_sql(
&ctx,
schema,
sql,
self.proxy.access_partition_table.enable_others,
)
.await
.map_err(|e| {
error!("Mysql service Failed to handle sql, err: {}", e);
Expand All @@ -120,7 +129,18 @@ where
.box_err()
.context(HandleSql {
sql: sql.to_string(),
})
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
}
.map_err(|e| {
error!("Mysql service Failed to handle sql, err: {}", e);
e
})
.box_err()
.context(HandleSql {
sql: sql.to_string(),
})
}

fn create_ctx(&self) -> Result<RequestContext> {
Expand Down
28 changes: 21 additions & 7 deletions server/src/postgresql/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ use pgwire::{
},
error::{PgWireError, PgWireResult},
};
use proxy::{context::RequestContext, http::sql::Request, Proxy};
use proxy::{
context::RequestContext, http::sql::convert_sql_response_to_output, read::SqlResponse, Context,
Proxy,
};
use snafu::ResultExt;

use crate::postgresql::error::{CreateContext, Result};
Expand All @@ -46,18 +49,29 @@ impl SimpleQueryHandler for PostgresqlHandler {
let ctx = self
.create_ctx()
.map_err(|e| PgWireError::ApiError(Box::new(e)))?;
let schema = &ctx.schema;
let ctx = Context::new(ctx.timeout, None);

let req = Request {
query: sql.to_string(),
};
let results = self
let results = match self
.proxy
.handle_http_sql_query(&ctx, req)
.handle_sql(
&ctx,
schema,
sql,
self.proxy.access_partition_table.enable_others,
)
.await
.map_err(|e| {
error!("PostgreSQL service Failed to handle sql, err: {}", e);
PgWireError::ApiError(Box::new(e))
})?;
})? {
SqlResponse::Forwarded(resp) => convert_sql_response_to_output(resp),
SqlResponse::Local(output) => Ok(output),
}
.map_err(|e| {
error!("PostgreSQL service Failed to handle sql, err: {}", e);
PgWireError::ApiError(Box::new(e))
})?;

Ok(vec![into_pg_reponse(results)?])
}
Expand Down
2 changes: 1 addition & 1 deletion server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl Builder {
hotspot_recorder.clone(),
engine_runtimes.clone(),
self.cluster.is_some(),
self.server_config.enable_partition_table_access,
self.server_config.sub_table_access_perm,
));

let http_service = http::Builder::new(http_config)
Expand Down

0 comments on commit 17e06fe

Please sign in to comment.