From a55500fdebac430f9d65dcc384a726df56af0957 Mon Sep 17 00:00:00 2001 From: "chunshao.rcs" Date: Wed, 12 Apr 2023 17:11:25 +0800 Subject: [PATCH 1/2] feat: impl prom query with proxy --- server/src/handlers/mod.rs | 3 +- server/src/http.rs | 18 +- server/src/proxy/http/mod.rs | 1 + server/src/{handlers => proxy/http}/prom.rs | 425 +++++++++----------- 4 files changed, 199 insertions(+), 248 deletions(-) rename server/src/{handlers => proxy/http}/prom.rs (60%) diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index 6f7fe190a9..1ef3eafdeb 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -1,11 +1,10 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! Request handlers pub mod admin; pub mod error; pub mod influxdb; -pub mod prom; pub mod query; pub mod route; diff --git a/server/src/http.rs b/server/src/http.rs index bff19a0ee0..67cc11a37a 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -14,7 +14,7 @@ use handlers::query::QueryRequest as HandlerQueryRequest; use log::{error, info}; use logger::RuntimeLevel; use profile::Profiler; -use prom_remote_api::{types::RemoteStorageRef, web}; +use prom_remote_api::web; use query_engine::executor::Executor as QueryExecutor; use router::{endpoint::Endpoint, Router, RouterRef}; use serde::Serialize; @@ -36,7 +36,6 @@ use crate::{ handlers::{ self, influxdb::{self, InfluxDb, InfluxqlParams, InfluxqlRequest, WriteParams, WriteRequest}, - prom::CeresDBStorage, }, instance::InstanceRef, metrics, @@ -128,7 +127,6 @@ pub struct Service { engine_runtimes: Arc, log_runtime: Arc, profiler: Arc, - prom_remote_storage: RemoteStorageRef, influxdb: Arc>, tx: Sender<()>, rx: Option>, @@ -204,16 +202,12 @@ impl Service { &self, ) -> impl Filter + Clone { let write_api = warp::path!("write") - .and(web::warp::with_remote_storage( - self.prom_remote_storage.clone(), - )) + .and(web::warp::with_remote_storage(self.proxy.clone())) .and(self.with_context()) .and(web::warp::protobuf_body()) .and_then(web::warp::write); let query_api = warp::path!("read") - .and(web::warp::with_remote_storage( - self.prom_remote_storage.clone(), - )) + .and(web::warp::with_remote_storage(self.proxy.clone())) .and(self.with_context()) .and(web::warp::protobuf_body()) .and_then(web::warp::read); @@ -684,10 +678,7 @@ impl Builder { .context(MissingSchemaConfigProvider)?; let router = self.router.context(MissingRouter)?; let opened_wals = self.opened_wals.context(MissingWal)?; - let prom_remote_storage = Arc::new(CeresDBStorage::new( - instance.clone(), - schema_config_provider.clone(), - )); + let influxdb = Arc::new(InfluxDb::new(instance, schema_config_provider)); let (tx, rx) = oneshot::channel(); @@ -695,7 +686,6 @@ impl Builder { proxy, engine_runtimes, log_runtime, - prom_remote_storage, influxdb, profiler: Arc::new(Profiler::default()), tx, diff --git a/server/src/proxy/http/mod.rs b/server/src/proxy/http/mod.rs index 0710d7d35d..77528bcb75 100644 --- a/server/src/proxy/http/mod.rs +++ b/server/src/proxy/http/mod.rs @@ -1,3 +1,4 @@ // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. +pub(crate) mod prom; pub(crate) mod query; diff --git a/server/src/handlers/prom.rs b/server/src/proxy/http/prom.rs similarity index 60% rename from server/src/handlers/prom.rs rename to server/src/proxy/http/prom.rs index 75211df71c..ef570fdab5 100644 --- a/server/src/handlers/prom.rs +++ b/server/src/proxy/http/prom.rs @@ -1,10 +1,10 @@ -// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. +// Copyright 2022-2023 CeresDB Project Authors. Licensed under Apache-2.0. //! This module implements prometheus remote storage API. //! It converts write request to gRPC write request, and //! translates query request to SQL for execution. -use std::{collections::HashMap, time::Instant}; +use std::{collections::HashMap, result::Result as StdResult, time::Instant}; use async_trait::async_trait; use ceresdbproto::storage::{ @@ -15,6 +15,7 @@ use common_types::{ request_id::RequestId, schema::{RecordSchema, TIMESTAMP_COLUMN, TSID_COLUMN}, }; +use common_util::error::BoxError; use interpreters::interpreter::Output; use log::debug; use prom_remote_api::types::{ @@ -22,219 +23,27 @@ use prom_remote_api::types::{ WriteRequest, }; use query_engine::executor::{Executor as QueryExecutor, RecordBatchVec}; -use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu}; +use snafu::{ensure, OptionExt, ResultExt}; use warp::reject; -use super::query::QueryRequest; use crate::{ context::RequestContext, - handlers, - instance::InstanceRef, - proxy::grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, - schema_config_provider::SchemaConfigProviderRef, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - #[snafu(display("Metric name is not found.\nBacktrace:\n{}", backtrace))] - MissingName { backtrace: Backtrace }, - - #[snafu(display("Invalid matcher type, value:{}.\nBacktrace:\n{}", value, backtrace))] - InvalidMatcherType { value: i32, backtrace: Backtrace }, - - #[snafu(display("Read response must be Rows.\nBacktrace:\n{}", backtrace))] - ResponseMustRows { backtrace: Backtrace }, - - #[snafu(display("TSID column is missing in query response.\nBacktrace:\n{}", backtrace))] - MissingTSID { backtrace: Backtrace }, - - #[snafu(display( - "Timestamp column is missing in query response.\nBacktrace:\n{}", - backtrace - ))] - MissingTimestamp { backtrace: Backtrace }, - - #[snafu(display( - "Value column is missing in query response.\nBacktrace:\n{}", - backtrace - ))] - MissingValue { backtrace: Backtrace }, - - #[snafu(display("Handle sql failed, err:{}.", source))] - SqlHandle { - source: Box, - }, - - #[snafu(display("Tsid must be u64, current:{}.\nBacktrace:\n{}", kind, backtrace))] - TsidMustU64 { - kind: DatumKind, - backtrace: Backtrace, - }, - - #[snafu(display("Timestamp wrong type, current:{}.\nBacktrace:\n{}", kind, backtrace))] - MustTimestamp { - kind: DatumKind, - backtrace: Backtrace, + proxy::{ + error::{Error, Internal, InternalNoCause, Result}, + grpc::write::{execute_insert_plan, write_request_to_insert_plan, WriteContext}, + http::query::{QueryRequest, Request}, + Proxy, }, - - #[snafu(display( - "Value must be f64 compatible type, current:{}.\nBacktrace:\n{}", - kind, - backtrace - ))] - F64Castable { - kind: DatumKind, - backtrace: Backtrace, - }, - - #[snafu(display( - "Tag must be string type, current:{}.\nBacktrace:\n{}", - kind, - backtrace - ))] - TagMustString { - kind: DatumKind, - backtrace: Backtrace, - }, - - #[snafu(display("Failed to write via gRPC, source:{}.", source))] - GRPCWriteError { source: crate::proxy::error::Error }, - - #[snafu(display("Failed to get schema, source:{}.", source))] - SchemaError { - source: crate::schema_config_provider::Error, - }, -} - -define_result!(Error); - -impl reject::Reject for Error {} +}; const NAME_LABEL: &str = "__name__"; const VALUE_COLUMN: &str = "value"; -pub struct CeresDBStorage { - instance: InstanceRef, - schema_config_provider: SchemaConfigProviderRef, -} - -impl CeresDBStorage { - pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { - Self { - instance, - schema_config_provider, - } - } -} - -impl CeresDBStorage { - /// Separate metric from labels, and sort labels by name. - fn normalize_labels(mut labels: Vec