Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: row protocol support for opentsdb #2623

Merged
merged 9 commits into from
Oct 20, 2023
26 changes: 15 additions & 11 deletions src/frontend/src/instance/opentsdb.rs
Original file line number Diff line number Diff line change
@@ -12,13 +12,13 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use api::v1::InsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_error::ext::BoxedError;
use servers::error as server_error;
use servers::error::AuthSnafu;
use servers::opentsdb::codec::DataPoint;
use servers::opentsdb::data_point_to_grpc_row_insert_requests;
use servers::query_handler::OpentsdbProtocolHandler;
use session::context::QueryContextRef;
use snafu::prelude::*;
@@ -27,23 +27,27 @@ use crate::instance::Instance;

#[async_trait]
impl OpentsdbProtocolHandler for Instance {
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> server_error::Result<()> {
async fn exec(
&self,
data_points: Vec<DataPoint>,
ctx: QueryContextRef,
) -> server_error::Result<usize> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::Opentsdb)
.context(AuthSnafu)?;

let requests = InsertRequests {
inserts: vec![data_point.as_grpc_insert()],
};
let _ = self
.handle_inserts(requests, ctx)
let (requests, _) = data_point_to_grpc_row_insert_requests(data_points)?;
let output = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.with_context(|_| server_error::ExecuteQuerySnafu {
query: format!("{data_point:?}"),
})?;
Ok(())
.context(servers::error::ExecuteGrpcQuerySnafu)?;

Ok(match output {
common_query::Output::AffectedRows(rows) => rows,
_ => unreachable!(),
})
}
}
33 changes: 17 additions & 16 deletions src/servers/src/http/opentsdb.rs
Original file line number Diff line number Diff line change
@@ -84,17 +84,19 @@ pub async fn put(
let summary = params.contains_key("summary");
let details = params.contains_key("details");

let data_points = parse_data_points(body).await?;
let data_point_requests = parse_data_points(body).await?;
let data_points = data_point_requests
Lilit0x marked this conversation as resolved.
Show resolved Hide resolved
.iter()
.map(|point| point.clone().into())
.collect::<Vec<_>>();

let response = if !summary && !details {
for data_point in data_points.into_iter() {
if let Err(e) = opentsdb_handler.exec(&data_point.into(), ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
if let Err(e) = opentsdb_handler.exec(data_points, ctx.clone()).await {
// Not debugging purpose, failed fast.
return error::InternalSnafu {
err_msg: e.to_string(),
}
.fail();
}
(HttpStatusCode::NO_CONTENT, Json(OpentsdbPutResponse::Empty))
} else {
@@ -108,14 +110,13 @@ pub async fn put(
},
};

for data_point in data_points.into_iter() {
let result = opentsdb_handler
.exec(&data_point.clone().into(), ctx.clone())
.await;
for (idx, data_point) in data_points.into_iter().enumerate() {
let result = opentsdb_handler.exec(vec![data_point], ctx.clone()).await;
match result {
Ok(()) => response.on_success(),
Ok(affected_rows) => response.on_success(affected_rows),
Err(e) => {
response.on_failed(data_point, e);
let data_point_request = data_point_requests.get(idx).unwrap();
response.on_failed(data_point_request.clone(), e);
Lilit0x marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
@@ -151,8 +152,8 @@ pub struct OpentsdbDebuggingResponse {
}

impl OpentsdbDebuggingResponse {
fn on_success(&mut self) {
self.success += 1;
fn on_success(&mut self, affected_rows: usize) {
self.success += affected_rows as i32;
}

fn on_failed(&mut self, datapoint: DataPointRequest, error: impl ErrorExt) {
39 changes: 39 additions & 0 deletions src/servers/src/opentsdb.rs
Original file line number Diff line number Diff line change
@@ -20,16 +20,20 @@ use std::future::Future;
use std::net::SocketAddr;
use std::sync::Arc;

use api::v1::RowInsertRequests;
use async_trait::async_trait;
use common_runtime::Runtime;
use common_telemetry::logging::error;
use futures::StreamExt;
use tokio::sync::broadcast;

use self::codec::DataPoint;
use crate::error::Result;
use crate::opentsdb::connection::Connection;
use crate::opentsdb::handler::Handler;
use crate::prom_store::{FIELD_COLUMN_NAME, TIMESTAMP_COLUMN_NAME};
use crate::query_handler::OpentsdbProtocolHandlerRef;
use crate::row_writer::{self, MultiTableData};
use crate::server::{AbortableStream, BaseTcpServer, Server};
use crate::shutdown::Shutdown;

@@ -126,3 +130,38 @@ impl Server for OpentsdbServer {
OPENTSDB_SERVER
}
}

pub fn data_point_to_grpc_row_insert_requests(
data_points: Vec<DataPoint>,
) -> Result<(RowInsertRequests, usize)> {
let mut multi_table_data = MultiTableData::new();

for mut data_point in data_points {
let tags: Vec<(String, String)> = std::mem::take(data_point.tags_mut());
let table_name = data_point.metric();
let value = data_point.value();
let timestamp = data_point.ts_millis();
// length of tags + 2 extra columns for greptime_timestamp and the value
let num_columns = tags.len() + 2;

let table_data = multi_table_data.get_or_default_table_data(table_name, num_columns, 1);
let mut one_row = table_data.alloc_one_row();

// tags
row_writer::write_tags(table_data, tags.into_iter(), &mut one_row)?;

// value
row_writer::write_f64(table_data, FIELD_COLUMN_NAME, value, &mut one_row)?;
// timestamp
row_writer::write_ts_millis(
table_data,
TIMESTAMP_COLUMN_NAME,
Some(timestamp),
&mut one_row,
)?;

table_data.add_row(one_row);
}

Ok(multi_table_data.into_row_insert_requests())
}
6 changes: 5 additions & 1 deletion src/servers/src/opentsdb/codec.rs
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@ use crate::error::{self, Result};
pub const OPENTSDB_TIMESTAMP_COLUMN_NAME: &str = "greptime_timestamp";
pub const OPENTSDB_FIELD_COLUMN_NAME: &str = "greptime_value";

#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct DataPoint {
metric: String,
ts_millis: i64,
@@ -115,6 +115,10 @@ impl DataPoint {
&self.tags
}

pub fn tags_mut(&mut self) -> &mut Vec<(String, String)> {
&mut self.tags
}

pub fn ts_millis(&self) -> i64 {
self.ts_millis
}
8 changes: 4 additions & 4 deletions src/servers/src/opentsdb/handler.rs
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ impl<S: AsyncWrite + AsyncRead + Unpin> Handler<S> {
match DataPoint::try_create(&line) {
Ok(data_point) => {
let _timer = timer!(crate::metrics::METRIC_TCP_OPENTSDB_LINE_WRITE_ELAPSED);
let result = self.query_handler.exec(&data_point, ctx.clone()).await;
let result = self.query_handler.exec(vec![data_point], ctx.clone()).await;
if let Err(e) = result {
self.connection.write_line(e.output_msg()).await?;
}
@@ -128,16 +128,16 @@ mod tests {

#[async_trait]
impl OpentsdbProtocolHandler for DummyQueryHandler {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
self.tx.send(metric.to_string()).await.unwrap();
Ok(())
Ok(data_points.len())
}
}

2 changes: 1 addition & 1 deletion src/servers/src/query_handler.rs
Original file line number Diff line number Diff line change
@@ -74,7 +74,7 @@ pub trait InfluxdbLineProtocolHandler {
pub trait OpentsdbProtocolHandler {
/// A successful request will not return a response.
/// Only on error will the socket return a line of data.
async fn exec(&self, data_point: &DataPoint, ctx: QueryContextRef) -> Result<()>;
async fn exec(&self, data_points: Vec<DataPoint>, ctx: QueryContextRef) -> Result<usize>;
}

pub struct PromStoreResponse {
10 changes: 4 additions & 6 deletions src/servers/tests/http/opentsdb_test.rs
Original file line number Diff line number Diff line change
@@ -51,15 +51,16 @@ impl GrpcQueryHandler for DummyInstance {

#[async_trait]
impl OpentsdbProtocolHandler for DummyInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let data_point = data_points.first().unwrap();
if data_point.metric() == "should_failed" {
return error::InternalSnafu {
err_msg: "expected",
}
.fail();
}
let _ = self.tx.send(data_point.metric().to_string()).await;
Ok(())
Ok(data_points.len())
}
}

@@ -172,10 +173,7 @@ async fn test_opentsdb_put() {
while let Ok(s) = rx.try_recv() {
metrics.push(s);
}
assert_eq!(
metrics,
vec!["m1".to_string(), "m2".to_string(), "m3".to_string()]
);
assert_eq!(metrics, vec!["m1".to_string(), "m2".to_string()]);
}

#[tokio::test]
6 changes: 3 additions & 3 deletions src/servers/tests/opentsdb.rs
Original file line number Diff line number Diff line change
@@ -37,8 +37,8 @@ struct DummyOpentsdbInstance {

#[async_trait]
impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
async fn exec(&self, data_point: &DataPoint, _ctx: QueryContextRef) -> Result<()> {
let metric = data_point.metric();
async fn exec(&self, data_points: Vec<DataPoint>, _ctx: QueryContextRef) -> Result<usize> {
let metric = data_points.first().unwrap().metric();
if metric == "should_failed" {
return server_error::InternalSnafu {
err_msg: "expected",
@@ -47,7 +47,7 @@ impl OpentsdbProtocolHandler for DummyOpentsdbInstance {
}
let i = metric.parse::<i32>().unwrap();
let _ = self.tx.send(i * i).await;
Ok(())
Ok(data_points.len())
}
}

27 changes: 14 additions & 13 deletions tests-integration/src/opentsdb.rs
Original file line number Diff line number Diff line change
@@ -46,6 +46,8 @@ mod tests {

async fn test_exec(instance: &Arc<Instance>) {
let ctx = QueryContext::arc();

// should create new table "my_metric_1" directly
let data_point1 = DataPoint::new(
"my_metric_1".to_string(),
1000,
@@ -55,9 +57,8 @@ mod tests {
("tagk2".to_string(), "tagv2".to_string()),
],
);
// should create new table "my_metric_1" directly
instance.exec(&data_point1, ctx.clone()).await.unwrap();

// should create new column "tagk3" directly
let data_point2 = DataPoint::new(
"my_metric_1".to_string(),
2000,
@@ -67,12 +68,12 @@ mod tests {
("tagk3".to_string(), "tagv3".to_string()),
],
);
// should create new column "tagk3" directly
instance.exec(&data_point2, ctx.clone()).await.unwrap();

let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);
// should handle null tags properly
instance.exec(&data_point3, ctx.clone()).await.unwrap();
let data_point3 = DataPoint::new("my_metric_1".to_string(), 3000, 3.0, vec![]);

let data_points = vec![data_point1, data_point2, data_point3];
instance.exec(data_points, ctx.clone()).await.unwrap();

let output = instance
.do_query(
@@ -87,13 +88,13 @@ mod tests {
let recordbatches = RecordBatches::try_collect(stream).await.unwrap();
let pretty_print = recordbatches.pretty_print().unwrap();
let expected = vec![
"+---------------------+----------------+-------+-------+-------+",
"| greptime_timestamp | greptime_value | tagk1 | tagk2 | tagk3 |",
"+---------------------+----------------+-------+-------+-------+",
"| 1970-01-01T00:00:01 | 1.0 | tagv1 | tagv2 | |",
"| 1970-01-01T00:00:02 | 2.0 | | tagv2 | tagv3 |",
"| 1970-01-01T00:00:03 | 3.0 | | | |",
"+---------------------+----------------+-------+-------+-------+",
"+-------+-------+----------------+---------------------+-------+",
"| tagk1 | tagk2 | greptime_value | greptime_timestamp | tagk3 |",
"+-------+-------+----------------+---------------------+-------+",
"| tagv1 | tagv2 | 1.0 | 1970-01-01T00:00:01 | |",
"| | tagv2 | 2.0 | 1970-01-01T00:00:02 | tagv3 |",
"| | | 3.0 | 1970-01-01T00:00:03 | |",
"+-------+-------+----------------+---------------------+-------+",
]
.into_iter()
.join("\n");