Skip to content

Commit

Permalink
refactor: move http event handler to a separate file
Browse files Browse the repository at this point in the history
  • Loading branch information
shuiyisong committed Jun 4, 2024
1 parent 061b14e commit c152472
Show file tree
Hide file tree
Showing 6 changed files with 162 additions and 134 deletions.
25 changes: 9 additions & 16 deletions src/frontend/src/instance/log_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use pipeline::{GreptimeTransformer, Pipeline};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu};
use servers::error::{AuthSnafu, ExecuteGrpcRequestSnafu, Result as ServerResult};
use servers::query_handler::LogHandler;
use session::context::QueryContextRef;
use snafu::ResultExt;
Expand All @@ -31,7 +31,7 @@ impl LogHandler for Instance {
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> servers::error::Result<Output> {
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand All @@ -43,9 +43,9 @@ impl LogHandler for Instance {

async fn get_pipeline(
&self,
query_ctx: QueryContextRef,
name: &str,
) -> servers::error::Result<Pipeline<GreptimeTransformer>> {
query_ctx: QueryContextRef,
) -> ServerResult<Pipeline<GreptimeTransformer>> {
self.pipeline_operator
.get_pipeline(query_ctx, name)
.await
Expand All @@ -55,24 +55,17 @@ impl LogHandler for Instance {

async fn insert_pipeline(
&self,
query_ctx: QueryContextRef,
name: &str,
content_type: &str,
pipeline: &str,
) -> servers::error::Result<()> {
query_ctx: QueryContextRef,
) -> ServerResult<()> {
self.pipeline_operator
.insert_pipeline(query_ctx, name, content_type, pipeline)
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::InsertPipelineSnafu { name })?;
Ok(())
}

async fn delete_pipeline(
&self,
_query_ctx: QueryContextRef,
_name: &str,
) -> servers::error::Result<()> {
async fn delete_pipeline(&self, _name: &str, _query_ctx: QueryContextRef) -> ServerResult<()> {
todo!("delete_pipeline")
}
}
Expand All @@ -82,7 +75,7 @@ impl Instance {
&self,
log: RowInsertRequests,
ctx: QueryContextRef,
) -> servers::error::Result<Output> {
) -> ServerResult<Output> {
self.inserter
.handle_log_inserts(log, ctx, self.statement_executor.as_ref())
.await
Expand Down
5 changes: 3 additions & 2 deletions src/frontend/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use operator::statement::StatementExecutorRef;
use pipeline::table::{PipelineTable, PipelineTableRef};
use pipeline::{GreptimeTransformer, Pipeline};
use query::QueryEngineRef;
use servers::error::Result as ServerResult;
use session::context::{QueryContext, QueryContextRef};
use snafu::{OptionExt, ResultExt};
use table::TableRef;
Expand Down Expand Up @@ -210,11 +211,11 @@ impl PipelineOperator {

pub async fn insert_pipeline(
&self,
query_ctx: QueryContextRef,
name: &str,
content_type: &str,
pipeline: &str,
) -> servers::error::Result<()> {
query_ctx: QueryContextRef,
) -> ServerResult<()> {
self.create_pipeline_table_if_not_exists(query_ctx.current_catalog())
.await
.map_err(|e| {
Expand Down
5 changes: 3 additions & 2 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ use crate::query_handler::{
use crate::server::Server;

pub mod authorize;
pub mod event;
pub mod handler;
pub mod header;
pub mod influxdb;
Expand Down Expand Up @@ -711,8 +712,8 @@ impl HttpServer {

fn route_log<S>(log_handler: LogHandlerRef) -> Router<S> {
Router::new()
.route("/logs", routing::post(handler::log_ingester))
.route("/pipelines", routing::post(handler::add_pipeline))
.route("/logs", routing::post(event::log_ingester))
.route("/pipelines", routing::post(event::add_pipeline))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_error))
Expand Down
140 changes: 140 additions & 0 deletions src/servers/src/http/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::extract::{Json, Query, State, TypedHeader};
use axum::headers::ContentType;
use axum::Extension;
use common_telemetry::error;
use pipeline::Value as PipelineValue;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use snafu::{OptionExt, ResultExt};

use crate::error::{
InsertLogSnafu, InvalidParameterSnafu, ParseJsonSnafu, Result, UnsupportedContentTypeSnafu,
};
use crate::http::greptime_result_v1::GreptimedbV1Response;
use crate::http::HttpResponse;
use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table_name: Option<String>,
pub db: Option<String>,
pub pipeline_name: Option<String>,
}

#[axum_macros::debug_handler]
pub async fn add_pipeline(
State(handler): State<LogHandlerRef>,
Extension(query_ctx): Extension<QueryContextRef>,
Json(payload): Json<Value>,
) -> Result<String> {
let name = payload["name"].as_str().context(InvalidParameterSnafu {
reason: "name is required in payload",
})?;
let pipeline = payload["pipeline"]
.as_str()
.context(InvalidParameterSnafu {
reason: "pipeline is required in payload",
})?;

let content_type = "yaml";
let result = handler
.insert_pipeline(name, content_type, pipeline, query_ctx)
.await;

result.map(|_| "ok".to_string()).map_err(|e| {
error!(e; "failed to insert pipeline");
e
})
}

#[axum_macros::debug_handler]
pub async fn log_ingester(
State(state): State<LogHandlerRef>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContextRef>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<HttpResponse> {
let value;
// TODO (qtang): we should decide json or jsonl
if content_type == ContentType::json() {
value = serde_json::from_str(&payload).context(ParseJsonSnafu)?;
// TODO (qtang): we should decide which content type to support
// form_url_cncoded type is only placeholder
} else if content_type == ContentType::form_url_encoded() {
value = parse_space_separated_log(payload)?;
} else {
return UnsupportedContentTypeSnafu { content_type }.fail();
}
log_ingester_inner(state, query_params, query_ctx, value)
.await
.or_else(|e| InsertLogSnafu { msg: e }.fail())
}

fn parse_space_separated_log(payload: String) -> Result<Value> {
// ToStructuredLogSnafu
let _log = payload.split_whitespace().collect::<Vec<&str>>();
// TODO (qtang): implement this
todo!()
}

async fn log_ingester_inner(
state: LogHandlerRef,
query_params: LogIngesterQueryParams,
query_ctx: QueryContextRef,
payload: Value,
) -> std::result::Result<HttpResponse, String> {
let pipeline_id = query_params
.pipeline_name
.ok_or("pipeline_name is required".to_string())?;

let pipeline_data = PipelineValue::try_from(payload)?;

let pipeline = state
.get_pipeline(&pipeline_id, query_ctx.clone())
.await
.map_err(|e| e.to_string())?;
let transformed_data: Rows = pipeline.exec(pipeline_data)?;

let table_name = query_params
.table_name
.ok_or("table_name is required".to_string())?;

let insert_request = RowInsertRequest {
rows: Some(transformed_data),
table_name: table_name.clone(),
};
let insert_requests = RowInsertRequests {
inserts: vec![insert_request],
};
state
.insert_log(insert_requests, query_ctx)
.await
.map(|_| {
HttpResponse::GreptimedbV1(GreptimedbV1Response {
output: vec![],
execution_time_ms: 0,
resp_metrics: HashMap::new(),
})
})
.map_err(|e| e.to_string())
}
112 changes: 1 addition & 111 deletions src/servers/src/http/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use std::env;
use std::time::Instant;

use aide::transform::TransformOperation;
use api::v1::{RowInsertRequest, RowInsertRequests, Rows};
use axum::extract::{Json, Query, State, TypedHeader};
use axum::headers::ContentType;
use axum::extract::{Json, Query, State};
use axum::response::{IntoResponse, Response};
use axum::{Extension, Form};
use common_error::ext::ErrorExt;
Expand All @@ -28,16 +26,13 @@ use common_plugins::GREPTIME_EXEC_WRITE_COST;
use common_query::{Output, OutputData};
use common_recordbatch::util;
use common_telemetry::tracing;
use pipeline::Value as PipelineValue;
use query::parser::{PromQuery, DEFAULT_LOOKBACK_STRING};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use session::context::QueryContextRef;
use snafu::ResultExt;

use super::header::collect_plan_metrics;
use crate::error::{Error, InsertLogSnafu, ParseJsonSnafu, UnsupportedContentTypeSnafu};
use crate::http::arrow_result::ArrowResponse;
use crate::http::csv_result::CsvResponse;
use crate::http::error_result::ErrorResponse;
Expand All @@ -50,7 +45,6 @@ use crate::http::{
};
use crate::metrics_handler::MetricsHandler;
use crate::query_handler::sql::ServerSqlQueryHandlerRef;
use crate::query_handler::LogHandlerRef;

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct SqlQuery {
Expand All @@ -71,110 +65,6 @@ pub struct SqlQuery {
pub limit: Option<usize>,
}

#[derive(Debug, Default, Serialize, Deserialize, JsonSchema)]
pub struct LogIngesterQueryParams {
pub table_name: Option<String>,
pub db: Option<String>,
pub pipeline_name: Option<String>,
}

fn parse_space_separated_log(payload: String) -> Result<Value, Error> {
// ToStructuredLogSnafu
let _log = payload.split_whitespace().collect::<Vec<&str>>();
// TODO (qtang): implement this
todo!()
}

async fn log_ingester_inner(
state: LogHandlerRef,
query_params: LogIngesterQueryParams,
query_ctx: QueryContextRef,
payload: Value,
) -> Result<HttpResponse, String> {
let pipeline_id = query_params
.pipeline_name
.ok_or("pipeline_name is required".to_string())?;

let pipeline_data = PipelineValue::try_from(payload)?;

let pipeline = state
.get_pipeline(query_ctx.clone(), &pipeline_id)
.await
.map_err(|e| e.to_string())?;
let transformed_data: Rows = pipeline.exec(pipeline_data)?;

let table_name = query_params
.table_name
.ok_or("table_name is required".to_string())?;

let insert_request = RowInsertRequest {
rows: Some(transformed_data),
table_name: table_name.clone(),
};
let insert_requests = RowInsertRequests {
inserts: vec![insert_request],
};
state
.insert_log(insert_requests, query_ctx)
.await
.map(|_| {
HttpResponse::GreptimedbV1(GreptimedbV1Response {
output: vec![],
execution_time_ms: 0,
resp_metrics: HashMap::new(),
})
})
.map_err(|e| e.to_string())
}

/// handler to log ingester
#[axum_macros::debug_handler]
pub async fn log_ingester(
State(state): State<LogHandlerRef>,
Query(query_params): Query<LogIngesterQueryParams>,
Extension(query_ctx): Extension<QueryContextRef>,
TypedHeader(content_type): TypedHeader<ContentType>,
payload: String,
) -> Result<HttpResponse, Error> {
let value;
// TODO (qtang): we should decide json or jsonl
if content_type == ContentType::json() {
value = serde_json::from_str(&payload).context(ParseJsonSnafu)?;
// TODO (qtang): we should decide which content type to support
// form_url_cncoded type is only placeholder
} else if content_type == ContentType::form_url_encoded() {
value = parse_space_separated_log(payload)?;
} else {
return UnsupportedContentTypeSnafu { content_type }.fail();
}
log_ingester_inner(state, query_params, query_ctx, value)
.await
.or_else(|e| InsertLogSnafu { msg: e }.fail())
}

#[axum_macros::debug_handler]
pub async fn add_pipeline(
State(_state): State<LogHandlerRef>,
Query(_query_params): Query<LogIngesterQueryParams>,
Extension(_query_ctx): Extension<QueryContextRef>,
TypedHeader(_content_type): TypedHeader<ContentType>,
Json(paylod): Json<Value>,
) -> String {
let name = paylod["name"].as_str().unwrap();
let pipeline = paylod["pipeline"].as_str().unwrap();
let content_type = "yaml";
let result = _state
.insert_pipeline(_query_ctx, name, content_type, pipeline)
.await;
match result {
Ok(_) => String::from("ok"),
Err(e) => {
common_telemetry::error!("failed to insert pipeline.{e:?}");
e.to_string()
}
}
}

/// Handler to execute sql
#[axum_macros::debug_handler]
#[tracing::instrument(skip_all, fields(protocol = "http", request_type = "sql"))]
Expand Down
Loading

0 comments on commit c152472

Please sign in to comment.