Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl mysql query with proxy #886

Merged
merged 3 commits into from
May 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions integration_tests/mysql/basic.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,9 @@
mysql -h 127.0.0.1 -P 3307 -e 'show tables'

mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()'

mysql -h 127.0.0.1 -P 3307 -e 'CREATE TABLE `demo`(`name`string TAG,`id` int TAG,`value` double NOT NULL,`t` timestamp NOT NULL,TIMESTAMP KEY(t)) ENGINE = Analytic with(enable_ttl=false)'

mysql -h 127.0.0.1 -P 3307 -e 'insert into demo (name,value,t)values("ceresdb",1,1683280523000)'

mysql -h 127.0.0.1 -P 3307 -e 'select * from demo'
92 changes: 3 additions & 89 deletions proxy/src/grpc/write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ use std::{cmp::max, collections::HashMap, time::Instant};

use ceresdbproto::storage::{
storage_service_client::StorageServiceClient, RouteRequest, WriteRequest, WriteResponse,
WriteTableRequest,
};
use cluster::config::SchemaConfig;
use common_types::request_id::RequestId;
use common_util::error::BoxError;
use futures::{future::try_join_all, FutureExt};
Expand All @@ -20,12 +18,12 @@ use snafu::{OptionExt, ResultExt};
use tonic::transport::Channel;

use crate::{
create_table, error,
error,
error::{build_ok_header, ErrNoCause, ErrWithCause, InternalNoCause, Result},
execute_add_columns_plan, execute_plan, find_new_columns,
execute_plan,
forward::{ForwardResult, ForwarderRef},
instance::InstanceRef,
try_get_table, write_table_request_to_insert_plan, Context, Proxy,
Context, Proxy,
};

#[derive(Debug)]
Expand Down Expand Up @@ -302,90 +300,6 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
}
}

// TODO: use write_request_to_insert_plan in proxy, and remove following code.
pub async fn write_request_to_insert_plan<Q: QueryExecutor + 'static>(
instance: InstanceRef<Q>,
table_requests: Vec<WriteTableRequest>,
schema_config: Option<&SchemaConfig>,
write_context: WriteContext,
) -> Result<Vec<InsertPlan>> {
let mut plan_vec = Vec::with_capacity(table_requests.len());

let WriteContext {
request_id,
catalog,
schema,
deadline,
auto_create_table,
} = write_context;
let schema_config = schema_config.cloned().unwrap_or_default();
for write_table_req in table_requests {
let table_name = &write_table_req.table;
let mut table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;

match table.clone() {
None => {
if auto_create_table {
create_table(
request_id,
&catalog,
&schema,
instance.clone(),
&write_table_req,
&schema_config,
deadline,
)
.await?;
// try to get table again
table = try_get_table(&catalog, &schema, instance.clone(), table_name)?;
}
}
Some(t) => {
if auto_create_table {
// The reasons for making the decision to add columns before writing are as
// follows:
// * If judged based on the error message returned, it may cause data that has
// already been successfully written to be written again and affect the
// accuracy of the data.
// * Currently, the decision to add columns is made at the request level, not at
// the row level, so the cost is relatively small.
let table_schema = t.schema();
let columns =
find_new_columns(&table_schema, &schema_config, &write_table_req)?;
if !columns.is_empty() {
execute_add_columns_plan(
request_id,
&catalog,
&schema,
instance.clone(),
t,
columns,
deadline,
)
.await?;
}
}
}
}

match table {
Some(table) => {
let plan = write_table_request_to_insert_plan(table, write_table_req)?;
plan_vec.push(plan);
}
None => {
return ErrNoCause {
code: StatusCode::BAD_REQUEST,
msg: format!("Table not found, schema:{schema}, table:{table_name}"),
}
.fail();
}
}
}

Ok(plan_vec)
}

pub async fn execute_insert_plan<Q: QueryExecutor + 'static>(
request_id: RequestId,
catalog: &str,
Expand Down
1 change: 0 additions & 1 deletion proxy/src/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

pub mod admin;
pub(crate) mod error;
pub mod query;
pub mod route;

mod prelude {
Expand Down
218 changes: 0 additions & 218 deletions proxy/src/handlers/query.rs

This file was deleted.

20 changes: 8 additions & 12 deletions proxy/src/influxdb/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use crate::{
context::RequestContext,
error::{ErrNoCause, ErrWithCause, Internal, Result},
execute_plan,
grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
grpc::write::{execute_insert_plan, WriteContext},
influxdb::types::{
convert_influxql_output, convert_write_request, InfluxqlRequest, InfluxqlResponse,
WriteRequest, WriteResponse,
Expand Down Expand Up @@ -63,17 +63,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let write_context =
WriteContext::new(request_id, deadline, catalog.clone(), schema.clone());

let plans = write_request_to_insert_plan(
self.instance.clone(),
convert_write_request(req)?,
schema_config,
write_context,
)
.await
.box_err()
.with_context(|| Internal {
msg: "write request to insert plan",
})?;
let plans = self
.write_request_to_insert_plan(convert_write_request(req)?, schema_config, write_context)
.await
.box_err()
.with_context(|| Internal {
msg: "write request to insert plan",
})?;

let mut success = 0;
for insert_plan in plans {
Expand Down
Loading