Skip to content

Commit

Permalink
feat: auto add column (apache#749)
Browse files Browse the repository at this point in the history
* feat: introduce proxy module (apache#715)

* impl route service with proxy

* impl write service with proxy

* remove forward module in proxy

* refactor code

* add tests in write

* feat: impl query with proxy (apache#717)

* refactor: refactor proxy module (apache#726)

* refactor: refactor proxy module

* cargo fmt

* refactor by CR

* Feat proxy prom query (apache#727)

* feat: impl prom query with proxy

* refactor code

* feat: impl stream write with proxy (apache#737)

* feat: impl stream query with proxy (apache#742)

* feat: impl stream query with proxy

* refactor by CR

* feat: introduce proxy module

* refactor code

* add header in storage service

* feat: impl storage service with proxy

* make CI happy

* refactor code

* refactor code

* refactor by CR

* feat: automatically create non-existent columns during insertion

* test: add autoAddColumns test in go sdk

* refactor code

* refactor by CR

* refactor by CR
  • Loading branch information
chunshao90 authored Mar 22, 2023
1 parent 54b72b0 commit aaa650a
Show file tree
Hide file tree
Showing 8 changed files with 259 additions and 150 deletions.
102 changes: 76 additions & 26 deletions integration_tests/sdk/go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,21 @@ func init() {
}
}

func write(ctx context.Context, client ceresdb.Client, ts int64) error {
func write(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
points := make([]ceresdb.Point, 0, 2)
for i := 0; i < 2; i++ {
point, err := ceresdb.NewPointBuilder(table).
builder := ceresdb.NewPointBuilder(table).
SetTimestamp(ts).
AddTag("name", ceresdb.NewStringValue(fmt.Sprintf("tag-%d", i))).
AddField("value", ceresdb.NewInt64Value(int64(i))).
Build()
AddField("value", ceresdb.NewInt64Value(int64(i)))

if addNewColumn {
builder = builder.AddTag("new_tag", ceresdb.NewStringValue(fmt.Sprintf("new-tag-%d", i))).
AddField("new_field", ceresdb.NewInt64Value(int64(i)))
}

point, err := builder.Build()

if err != nil {
return err
}
Expand Down Expand Up @@ -57,10 +64,10 @@ func ensureRow(expectedVals []ceresdb.Value, actualRow []ceresdb.Column) error {

}

func query(ctx context.Context, client ceresdb.Client, ts int64) error {
func query(ctx context.Context, client ceresdb.Client, ts int64, addNewColumn bool) error {
resp, err := client.SQLQuery(ctx, ceresdb.SQLQueryRequest{
Tables: []string{table},
SQL: fmt.Sprintf("select * from %s where timestamp = %d", table, ts),
SQL: fmt.Sprintf("select * from %s where timestamp = %d order by name", table, ts),
})
if err != nil {
return err
Expand All @@ -70,21 +77,32 @@ func query(ctx context.Context, client ceresdb.Client, ts int64) error {
return fmt.Errorf("expect 2 rows, current: %+v", len(resp.Rows))
}

if err := ensureRow([]ceresdb.Value{
row0 := []ceresdb.Value{
ceresdb.NewUint64Value(4024844655630594205),
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-0"),
ceresdb.NewInt64Value(0),
}, resp.Rows[0].Columns()); err != nil {
return err
}
ceresdb.NewInt64Value(0)}

return ensureRow([]ceresdb.Value{
row1 := []ceresdb.Value{
ceresdb.NewUint64Value(14230010170561829440),
ceresdb.NewInt64Value(ts),
ceresdb.NewStringValue("tag-1"),
ceresdb.NewInt64Value(1),
}, resp.Rows[1].Columns())
}

if addNewColumn {
row0[0] = ceresdb.NewUint64Value(8341999341185504339)
row1[0] = ceresdb.NewUint64Value(4452331151453582498)
row0 = append(row0, ceresdb.NewInt64Value(0), ceresdb.NewStringValue("new-tag-0"))
row1 = append(row1, ceresdb.NewInt64Value(1), ceresdb.NewStringValue("new-tag-1"))
}

if err := ensureRow(row0,
resp.Rows[0].Columns()); err != nil {
return err
}

return ensureRow(row1, resp.Rows[1].Columns())
}

func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error) {
Expand All @@ -99,6 +117,48 @@ func ddl(ctx context.Context, client ceresdb.Client, sql string) (uint32, error)
return resp.AffectedRows, nil
}

func checkAutoCreateTable(ctx context.Context, client ceresdb.Client) error {
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
return err
}

ts := currentMS()
if err := write(ctx, client, ts, false); err != nil {
return err
}

if err := query(ctx, client, ts, false); err != nil {
return err
}

return nil
}

func checkAutoAddColumns(ctx context.Context, client ceresdb.Client) error {
ts := currentMS()
if err := write(ctx, client, ts, true); err != nil {
return err
}

if err := query(ctx, client, ts, true); err != nil {
return err
}

return nil
}

func dropTable(ctx context.Context, client ceresdb.Client) error {
affected, err := ddl(ctx, client, "drop table "+table)
if err != nil {
return err
}

if affected != 0 {
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
}
return nil
}

func main() {
fmt.Printf("Begin test, endpoint %s...\n", endpoint)

Expand All @@ -110,28 +170,18 @@ func main() {
}

ctx := context.TODO()
if _, err := ddl(ctx, client, "drop table if exists "+table); err != nil {
panic(err)
}

ts := currentMS()
if err := write(ctx, client, ts); err != nil {
if err = checkAutoCreateTable(ctx, client); err != nil {
panic(err)
}

if err := query(ctx, client, ts); err != nil {
if err = checkAutoAddColumns(ctx, client); err != nil {
panic(err)
}

affected, err := ddl(ctx, client, "drop table "+table)
if err != nil {
if err = dropTable(ctx, client); err != nil {
panic(err)
}

if affected != 0 {
panic(fmt.Sprintf("drop table expected 0, actual is %d", affected))
}

fmt.Println("Test done")
}

Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use crate::{
context::RequestContext,
handlers,
instance::InstanceRef,
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
schema_config_provider::SchemaConfigProviderRef,
};

Expand Down Expand Up @@ -120,7 +120,7 @@ impl<Q: QueryExecutor + 'static> InfluxDb<Q> {

let mut success = 0;
for insert_plan in plans {
success += execute_plan(
success += execute_insert_plan(
request_id,
catalog,
schema,
Expand Down
4 changes: 2 additions & 2 deletions server/src/handlers/prom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use crate::{
context::RequestContext,
handlers,
instance::InstanceRef,
proxy::grpc::write::{execute_plan, write_request_to_insert_plan, WriteContext},
proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext},
schema_config_provider::SchemaConfigProviderRef,
};

Expand Down Expand Up @@ -257,7 +257,7 @@ impl<Q: QueryExecutor + 'static> RemoteStorage for CeresDBStorage<Q> {

let mut success = 0;
for insert_plan in plans {
success += execute_plan(
success += execute_insert_plan(
request_id,
catalog,
schema,
Expand Down
13 changes: 8 additions & 5 deletions server/src/proxy/grpc/prom_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use common_types::{
use common_util::error::BoxError;
use http::StatusCode;
use interpreters::{context::Context as InterpreterContext, factory::Factory, interpreter::Output};
use log::info;
use log::{error, info};
use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec};
use snafu::{ensure, OptionExt, ResultExt};
use sql::{
Expand All @@ -42,10 +42,13 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
req: PrometheusQueryRequest,
) -> PrometheusQueryResponse {
match self.handle_prom_query_internal(ctx, req).await {
Err(e) => PrometheusQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
},
Err(e) => {
error!("Failed to handle prom query, err:{e}");
PrometheusQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
}
}
Ok(v) => v,
}
}
Expand Down
2 changes: 2 additions & 0 deletions server/src/proxy/grpc/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use ceresdbproto::storage::{RouteRequest, RouteResponse};
use common_util::error::BoxError;
use http::StatusCode;
use log::error;
use query_engine::executor::Executor as QueryExecutor;
use snafu::ResultExt;

Expand All @@ -23,6 +24,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
let mut resp = RouteResponse::default();
match routes {
Err(e) => {
error!("Failed to handle route, err:{e}");
resp.header = Some(error::build_err_header(e));
}
Ok(v) => {
Expand Down
12 changes: 8 additions & 4 deletions server/src/proxy/grpc/sql_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ const STREAM_QUERY_CHANNEL_LEN: usize = 20;
impl<Q: QueryExecutor + 'static> Proxy<Q> {
pub async fn handle_sql_query(&self, ctx: Context, req: SqlQueryRequest) -> SqlQueryResponse {
match self.handle_sql_query_internal(ctx, req).await {
Err(e) => SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
},
Err(e) => {
error!("Failed to handle sql query, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
}
}
Ok(v) => v,
}
}
Expand All @@ -55,6 +58,7 @@ impl<Q: QueryExecutor + 'static> Proxy<Q> {
) -> BoxStream<'static, SqlQueryResponse> {
match self.clone().handle_stream_query_internal(ctx, req).await {
Err(e) => stream::once(async {
error!("Failed to handle stream sql query, err:{e}");
SqlQueryResponse {
header: Some(error::build_err_header(e)),
..Default::default()
Expand Down
Loading

0 comments on commit aaa650a

Please sign in to comment.