diff --git a/Cargo.lock b/Cargo.lock index add93025d3..be6df5f0e8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -83,7 +83,7 @@ dependencies = [ "arrow 32.0.0", "async-trait", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "ceresdbproto", "common_types", "common_util", @@ -558,7 +558,7 @@ dependencies = [ "async-trait", "axum-core", "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "futures-util", "http", "http-body", @@ -585,7 +585,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9f0c0a60006f2a293d82d571f635042a72edf927539b7685bd62d361963839b" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "futures-util", "http", "http-body", @@ -921,15 +921,15 @@ dependencies = [ [[package]] name = "bytes" -version = "1.2.1" +version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ec8a7b6a70fde80372154c65702f00a0f56f3e1c36abbc6c440484be248856db" +checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "bytes_ext" version = "1.0.0" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "snafu 0.6.10", ] @@ -1738,7 +1738,7 @@ dependencies = [ "arrow 32.0.0", "async-compression", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "bzip2", "chrono", "dashmap 5.4.0", @@ -2481,7 +2481,7 @@ version = "0.3.13" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37a82c6d637fc9515a4694bbf1cb2457b79d81ce52b3108bdeea58b07dd34a57" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "fnv", "futures-core", "futures-sink", @@ -2542,7 +2542,7 @@ checksum = "4cff78e5788be1e0ab65b04d306b2ed5092c815ec97ec70f4ebd5aee158aa55d" dependencies = [ "base64 0.13.0", "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "headers-core", "http", "httpdate", @@ -2607,7 +2607,7 @@ version = "0.2.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "fnv", "itoa 1.0.3", ] @@ -2618,7 +2618,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "http", "pin-project-lite", ] @@ -2656,7 +2656,7 @@ version = "0.14.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-channel", "futures-core", "futures-util", @@ -2791,6 +2791,17 @@ dependencies = [ "once_cell", ] +[[package]] +name = "influxdb_line_protocol" +version = "0.1.0" +source = "git+https://github.com/jiacai2050/influxdb_line_protocol#14e00a3dbc99a5edff226b92e3496314b086acf4" +dependencies = [ + "bytes 1.4.0", + "nom 7.1.1", + "smallvec", + "snafu 0.7.1", +] + [[package]] name = "instant" version = "0.1.12" @@ -3499,7 +3510,7 @@ dependencies = [ "bitflags", "bitvec", "byteorder", - "bytes 1.2.1", + "bytes 1.4.0", "cc", "chrono", "cmake", @@ -3743,7 +3754,7 @@ checksum = "e1ea8f683b4f89a64181393742c041520a1a87e9775e6b4c0dd5a3281af05fc6" dependencies = [ "async-trait", "base64 0.21.0", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "futures 0.3.25", "itertools", @@ -3767,7 +3778,7 @@ name = "object_store" version = "1.0.0" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "ceresdbproto", "chrono", "clru", @@ -3886,7 +3897,7 @@ checksum = "a0d9aab6ebed77bd0998c728fbef20d6afc63db38c8fe85e0923b624c1b6bfab" dependencies = [ "async-trait", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "derive_more", "hmac", @@ -3961,7 +3972,7 @@ dependencies = [ "arrow-select", "base64 0.21.0", "brotli", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "flate2", "futures 0.3.25", @@ -4018,7 +4029,7 @@ dependencies = [ "arrow 32.0.0", "arrow_ext", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "common_util", "datafusion", "datafusion-expr", @@ -4424,7 +4435,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21dc42e00223fc37204bd4aa177e69420c604ca4a183209a8f9de30c6d934698" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "prost-derive", ] @@ -4434,7 +4445,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a3f8ad728fb08fe212df3c05169e940fbb6d9d16a877ddde14644a983ba2012e" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "heck 0.4.0", "itertools", "lazy_static", @@ -4469,7 +4480,7 @@ version = "0.11.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a5e0526209433e96d83d750dd81a99118edbc55739e7e61a46764fd2ad537788" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "prost", ] @@ -4916,7 +4927,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68cc60575865c7831548863cc02356512e3f1dc2f3f82cb837d7fc4cc8f3c97c" dependencies = [ "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "encoding_rs", "futures-core", "futures-util", @@ -5004,7 +5015,7 @@ version = "0.3.0" source = "git+https://github.com/influxdata/rskafka.git?rev=00988a564b1db0249d858065fc110476c075efad#00988a564b1db0249d858065fc110476c075efad" dependencies = [ "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "chrono", "crc32c", "flate2", @@ -5310,7 +5321,7 @@ dependencies = [ "arrow 32.0.0", "arrow_ext", "async-trait", - "bytes 1.2.1", + "bytes 1.4.0", "catalog", "ceresdbproto", "cluster", @@ -5321,6 +5332,7 @@ dependencies = [ "df_operator", "futures 0.3.25", "http", + "influxdb_line_protocol", "interpreters", "lazy_static", "log", @@ -5469,7 +5481,7 @@ name = "skiplist" version = "1.0.0" dependencies = [ "arena", - "bytes 1.2.1", + "bytes 1.4.0", "criterion", "rand 0.7.3", "yatp", @@ -5539,9 +5551,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.9.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2fd0db749597d91ff862fd1d55ea87f7855a744a8425a64695b6fca237d1dad1" +checksum = "a507befe795404456341dfab10cef66ead4c041f62b8b11bbb92bffe5d0953e0" [[package]] name = "snafu" @@ -6101,7 +6113,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8e00990ebabbe4c14c08aca901caed183ecd5c09562a12c824bb53d3c3fd3af" dependencies = [ "autocfg 1.1.0", - "bytes 1.2.1", + "bytes 1.4.0", "libc", "memchr", "mio", @@ -6186,7 +6198,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53474327ae5e166530d17f2d956afcb4f8a004de581b3cae10f12006bc8163e3" dependencies = [ "async-stream", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "tokio", "tokio-stream", @@ -6210,7 +6222,7 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bb2e075f03b3d66d8d8785356224ba688d2906a371015e225beeb65ca92c740" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-sink", "pin-project-lite", @@ -6237,7 +6249,7 @@ dependencies = [ "async-trait", "axum", "base64 0.13.0", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-util", "h2", @@ -6317,7 +6329,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c530c8675c1dbf98facee631536fa116b5fb6382d7dd6dc1b118d970eafe3ba" dependencies = [ "bitflags", - "bytes 1.2.1", + "bytes 1.4.0", "futures-core", "futures-util", "http", @@ -6464,7 +6476,7 @@ checksum = "e27992fd6a8c29ee7eef28fc78349aa244134e10ad447ce3b9f0ac0ed0fa4ce0" dependencies = [ "base64 0.13.0", "byteorder", - "bytes 1.2.1", + "bytes 1.4.0", "http", "httparse", "log", @@ -6695,7 +6707,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed7b8be92646fc3d18b06147664ebc5f48d222686cb11a8755e561a735aacc6d" dependencies = [ - "bytes 1.2.1", + "bytes 1.4.0", "futures-channel", "futures-util", "headers", diff --git a/integration_tests/src/database.rs b/integration_tests/src/database.rs index 06aa2f1f17..4addf9c817 100644 --- a/integration_tests/src/database.rs +++ b/integration_tests/src/database.rs @@ -18,7 +18,6 @@ use ceresdb_client::{ RpcContext, }; use reqwest::ClientBuilder; -use serde::Serialize; use sql::{ ast::{Statement, TableName}, parser::Parser, @@ -46,11 +45,6 @@ struct HttpClient { endpoint: String, } -#[derive(Clone, Serialize)] -struct InfluxQLRequest { - query: String, -} - impl HttpClient { fn new(endpoint: String) -> Self { let client = ClientBuilder::new() @@ -178,12 +172,11 @@ impl CeresDB { } async fn execute_influxql(query: String, http_client: HttpClient) -> Box { - let url = format!("http://{}/influxql", http_client.endpoint); - let query_request = InfluxQLRequest { query }; + let url = format!("http://{}/influxdb/v1/query", http_client.endpoint); let resp = http_client .client .post(url) - .json(&query_request) + .body(query) .send() .await .unwrap(); diff --git a/server/Cargo.toml b/server/Cargo.toml index 8aa7275a35..97e2c82baa 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,6 +26,7 @@ datafusion-expr = { workspace = true } df_operator = { workspace = true } futures = { workspace = true } http = "0.2" +influxdb_line_protocol = { git = "https://github.com/jiacai2050/influxdb_line_protocol" } interpreters = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } diff --git a/server/src/handlers/error.rs b/server/src/handlers/error.rs index 1b0362623e..16c2355a53 100644 --- a/server/src/handlers/error.rs +++ b/server/src/handlers/error.rs @@ -2,7 +2,9 @@ //! Error of handlers +use common_util::error::GenericError; use snafu::{Backtrace, Snafu}; +use warp::reject::Reject; use crate::limiter; // TODO(yingwen): Avoid printing huge sql string @@ -70,6 +72,11 @@ pub enum Error { source: tokio::time::error::Elapsed, backtrace: Backtrace, }, + + #[snafu(display("InfluxDb handler failed, msg:{}, source:{}", msg, source))] + InfluxDbHandler { msg: String, source: GenericError }, } define_result!(Error); + +impl Reject for Error {} diff --git a/server/src/handlers/influxdb.rs b/server/src/handlers/influxdb.rs new file mode 100644 index 0000000000..8a057a2e5c --- /dev/null +++ b/server/src/handlers/influxdb.rs @@ -0,0 +1,366 @@ +// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0. + +//! This module implements [write][1] and [query][2] for InfluxDB. +//! [1]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#write-http-endpoint +//! [2]: https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint + +use std::{collections::HashMap, sync::Arc, time::Instant}; + +use bytes::Bytes; +use ceresdbproto::storage::{ + value, Field, FieldGroup, Tag, Value, WriteSeriesEntry, WriteTableRequest, +}; +use common_types::{request_id::RequestId, time::Timestamp}; +use common_util::error::BoxError; +use handlers::{ + error::{InfluxDbHandler, Result}, + query::QueryRequest, +}; +use influxdb_line_protocol::FieldValue; +use log::debug; +use query_engine::executor::Executor as QueryExecutor; +use snafu::ResultExt; +use warp::{reject, reply, Rejection, Reply}; + +use crate::{ + context::RequestContext, grpc::storage_service::write::WriteContext, handlers, + instance::InstanceRef, schema_config_provider::SchemaConfigProviderRef, +}; + +pub struct InfluxDb { + instance: InstanceRef, + schema_config_provider: SchemaConfigProviderRef, +} + +#[derive(Debug, Default)] +pub enum Precision { + #[default] + Millisecond, + // TODO: parse precision `second` from HTTP API + #[allow(dead_code)] + Second, +} + +impl Precision { + fn normalize(&self, ts: i64) -> i64 { + match self { + Self::Millisecond => ts, + Self::Second => ts * 1000, + } + } +} + +/// Line protocol +#[derive(Debug)] +pub struct WriteRequest { + pub lines: String, + pub precision: Precision, +} + +impl From for WriteRequest { + fn from(bytes: Bytes) -> Self { + WriteRequest { + lines: String::from_utf8_lossy(&bytes).to_string(), + precision: Default::default(), + } + } +} + +pub type WriteResponse = (); + +impl InfluxDb { + pub fn new(instance: InstanceRef, schema_config_provider: SchemaConfigProviderRef) -> Self { + Self { + instance, + schema_config_provider, + } + } + + async fn query( + &self, + ctx: RequestContext, + req: QueryRequest, + ) -> Result { + handlers::query::handle_query(&ctx, self.instance.clone(), req) + .await + .map(handlers::query::convert_output) + } + + async fn write(&self, ctx: RequestContext, req: WriteRequest) -> Result { + let request_id = RequestId::next_id(); + let deadline = ctx.timeout.map(|t| Instant::now() + t); + let catalog = &ctx.catalog; + self.instance.catalog_manager.default_catalog_name(); + let schema = &ctx.schema; + let schema_config = self + .schema_config_provider + .schema_config(schema) + .box_err() + .with_context(|| InfluxDbHandler { + msg: format!("get schema config failed, schema:{schema}"), + })?; + + let write_context = + WriteContext::new(request_id, deadline, catalog.clone(), schema.clone()); + + let plans = crate::grpc::storage_service::write::write_request_to_insert_plan( + self.instance.clone(), + convert_write_request(req)?, + schema_config, + write_context, + ) + .await + .box_err() + .with_context(|| InfluxDbHandler { + msg: "write request to insert plan", + })?; + + let mut success = 0; + for insert_plan in plans { + success += crate::grpc::storage_service::write::execute_plan( + request_id, + catalog, + schema, + self.instance.clone(), + insert_plan, + deadline, + ) + .await + .box_err() + .with_context(|| InfluxDbHandler { + msg: "execute plan", + })?; + } + debug!( + "Influxdb write finished, catalog:{}, schema:{}, success:{}", + catalog, schema, success + ); + + Ok(()) + } +} + +fn convert_write_request(req: WriteRequest) -> Result> { + let mut req_by_measurement = HashMap::new(); + let default_ts = Timestamp::now().as_i64(); + for line in influxdb_line_protocol::parse_lines(&req.lines) { + let mut line = line.box_err().with_context(|| InfluxDbHandler { + msg: "invalid line", + })?; + + let timestamp = line + .timestamp + .map_or_else(|| default_ts, |ts| req.precision.normalize(ts)); + let mut tag_set = line.series.tag_set.unwrap_or_default(); + // sort by tag key + tag_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + // sort by field key + line.field_set.sort_unstable_by(|a, b| a.0.cmp(&b.0)); + + let req_for_one_measurement = req_by_measurement + .entry(line.series.measurement.to_string()) + .or_insert_with(|| WriteTableRequest { + table: line.series.measurement.to_string(), + tag_names: tag_set.iter().map(|(tagk, _)| tagk.to_string()).collect(), + field_names: line + .field_set + .iter() + .map(|(tagk, _)| tagk.to_string()) + .collect(), + entries: Vec::new(), + }); + + let tags: Vec<_> = tag_set + .iter() + .enumerate() + .map(|(idx, (_, tagv))| Tag { + name_index: idx as u32, + value: Some(Value { + value: Some(value::Value::StringValue(tagv.to_string())), + }), + }) + .collect(); + let field_group = FieldGroup { + timestamp, + fields: line + .field_set + .iter() + .cloned() + .enumerate() + .map(|(idx, (_, fieldv))| Field { + name_index: idx as u32, + value: Some(convert_influx_value(fieldv)), + }) + .collect(), + }; + let mut found = false; + for entry in &mut req_for_one_measurement.entries { + if entry.tags == tags { + // TODO: remove clone? + entry.field_groups.push(field_group.clone()); + found = true; + break; + } + } + if !found { + req_for_one_measurement.entries.push(WriteSeriesEntry { + tags, + field_groups: vec![field_group], + }) + } + } + + Ok(req_by_measurement.into_values().collect()) +} + +/// Convert influxdb's FieldValue to ceresdbproto's Value +fn convert_influx_value(field_value: FieldValue) -> Value { + let v = match field_value { + FieldValue::I64(v) => value::Value::Int64Value(v), + FieldValue::U64(v) => value::Value::Uint64Value(v), + FieldValue::F64(v) => value::Value::Float64Value(v), + FieldValue::String(v) => value::Value::StringValue(v.to_string()), + FieldValue::Boolean(v) => value::Value::BoolValue(v), + }; + + Value { value: Some(v) } +} + +// TODO: Request and response type don't match influxdb's API now. +pub async fn query( + ctx: RequestContext, + db: Arc>, + req: QueryRequest, +) -> std::result::Result { + db.query(ctx, req) + .await + .map_err(reject::custom) + .map(|v| reply::json(&v)) +} + +// TODO: Request and response type don't match influxdb's API now. +pub async fn write( + ctx: RequestContext, + db: Arc>, + req: WriteRequest, +) -> std::result::Result { + db.write(ctx, req) + .await + .map_err(reject::custom) + .map(|_| warp::http::StatusCode::NO_CONTENT) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_convert_influxdb_write_req() { + let lines = r#" +demo,tag1=t1,tag2=t2 field1=90,field2=100 1678675992000 +demo,tag1=t1,tag2=t2 field1=91,field2=101 1678675993000 +demo,tag1=t11,tag2=t22 field1=900,field2=1000 1678675992000 +demo,tag1=t11,tag2=t22 field1=901,field2=1001 1678675993000 +"# + .to_string(); + let req = WriteRequest { + lines, + precision: Precision::Millisecond, + }; + + let pb_req = convert_write_request(req).unwrap(); + assert_eq!(1, pb_req.len()); + assert_eq!( + pb_req[0], + WriteTableRequest { + table: "demo".to_string(), + tag_names: vec!["tag1".to_string(), "tag2".to_string()], + field_names: vec!["field1".to_string(), "field2".to_string()], + entries: vec![ + // First series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t1".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t2".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(90.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(100.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(91.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(101.0))), + } + ] + }, + ] + }, + // Second series + WriteSeriesEntry { + tags: vec![ + Tag { + name_index: 0, + value: Some(convert_influx_value(FieldValue::String("t11".into()))), + }, + Tag { + name_index: 1, + value: Some(convert_influx_value(FieldValue::String("t22".into()))), + }, + ], + field_groups: vec![ + FieldGroup { + timestamp: 1678675992000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(900.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1000.0))), + } + ] + }, + FieldGroup { + timestamp: 1678675993000, + fields: vec![ + Field { + name_index: 0, + value: Some(convert_influx_value(FieldValue::F64(901.0))), + }, + Field { + name_index: 1, + value: Some(convert_influx_value(FieldValue::F64(1001.0))), + } + ] + }, + ] + } + ] + } + ); + } +} diff --git a/server/src/handlers/mod.rs b/server/src/handlers/mod.rs index 229f00c2c0..cf0f264c67 100644 --- a/server/src/handlers/mod.rs +++ b/server/src/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod admin; pub mod error; +pub mod influxdb; pub mod prom; pub mod query; diff --git a/server/src/http.rs b/server/src/http.rs index c5201fd25b..f54db35dc6 100644 --- a/server/src/http.rs +++ b/server/src/http.rs @@ -31,7 +31,12 @@ use crate::{ consts, context::RequestContext, error_util, - handlers::{self, prom::CeresDBStorage, query::Request}, + handlers::{ + self, + influxdb::{self, InfluxDb}, + prom::CeresDBStorage, + query::Request, + }, instance::InstanceRef, metrics, schema_config_provider::SchemaConfigProviderRef, @@ -109,6 +114,7 @@ pub struct Service { instance: InstanceRef, profiler: Arc, prom_remote_storage: RemoteStorageRef, + influxdb: Arc>, tx: Sender<()>, config: HttpConfig, } @@ -125,14 +131,17 @@ impl Service { &self, ) -> impl Filter + Clone { self.home() + // public APIs .or(self.metrics()) .or(self.sql()) - .or(self.influxql()) - .or(self.heap_profile()) + .or(self.influxdb_api()) + .or(self.prom_api()) + // admin APIs .or(self.admin_block()) + // debug APIs .or(self.flush_memtable()) .or(self.update_log_level()) - .or(self.prom_api()) + .or(self.heap_profile()) } /// Expose `/prom/v1/read` and `/prom/v1/write` to serve Prometheus remote @@ -157,6 +166,7 @@ impl Service { warp::path!("prom" / "v1" / ..) .and(warp::post()) + .and(warp::body::content_length_limit(self.config.max_body_size)) .and(write_api.or(query_api)) } @@ -200,41 +210,25 @@ impl Service { }) } - // POST /influxql - // this request type is not what influxdb API expected, the one in influxdb: - // https://docs.influxdata.com/influxdb/v1.8/tools/api/#query-http-endpoint - fn influxql( + /// POST `/influxdb/v1/query` and `/influxdb/v1/write` + fn influxdb_api( &self, ) -> impl Filter + Clone { - // accept json or plain text - let extract_request = warp::body::json() - .or(warp::body::bytes().map(Request::from)) - .unify(); + let write_api = warp::path!("write") + .and(self.with_context()) + .and(self.with_influxdb()) + .and(warp::body::bytes().map(influxdb::WriteRequest::from)) + .and_then(influxdb::write); + let query_api = warp::path!("query") + .and(self.with_context()) + .and(self.with_influxdb()) + .and(warp::body::bytes().map(|bytes| QueryRequest::Influxql(Request::from(bytes)))) + .and_then(influxdb::query); - warp::path!("influxql") + warp::path!("influxdb" / "v1" / ..) .and(warp::post()) .and(warp::body::content_length_limit(self.config.max_body_size)) - .and(extract_request) - .and(self.with_context()) - .and(self.with_instance()) - .and_then(|req, ctx, instance| async move { - let req = QueryRequest::Influxql(req); - let result = handlers::query::handle_query(&ctx, instance, req) - .await - // TODO: the sql's `convert_output` function may be not suitable to influxql. - // We should implement influxql's related function in later. - .map(handlers::query::convert_output) - .map_err(|e| { - // TODO(yingwen): Maybe truncate and print the sql - error!("Http service Failed to handle sql, err:{}", e); - Box::new(e) - }) - .context(HandleRequest); - match result { - Ok(res) => Ok(reply::json(&res)), - Err(e) => Err(reject::custom(e)), - } - }) + .and(write_api.or(query_api)) } // POST /debug/flush_memtable @@ -407,6 +401,13 @@ impl Service { warp::any().map(move || profiler.clone()) } + fn with_influxdb( + &self, + ) -> impl Filter>,), Error = Infallible> + Clone { + let influxdb = self.influxdb.clone(); + warp::any().map(move || influxdb.clone()) + } + fn with_instance( &self, ) -> impl Filter,), Error = Infallible> + Clone { @@ -474,8 +475,9 @@ impl Builder { .context(MissingSchemaConfigProvider)?; let prom_remote_storage = Arc::new(CeresDBStorage::new( instance.clone(), - schema_config_provider, + schema_config_provider.clone(), )); + let influxdb = Arc::new(InfluxDb::new(instance.clone(), schema_config_provider)); let (tx, rx) = oneshot::channel(); let service = Service { @@ -483,6 +485,7 @@ impl Builder { log_runtime, instance, prom_remote_storage, + influxdb, profiler: Arc::new(Profiler::default()), tx, config: self.config.clone(),