Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: impl label_join and label_replace for promql #5153

Merged
merged 5 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 217 additions & 0 deletions src/query/src/promql/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,35 @@ impl PromPlanner {
exprs.push(date_part_expr);
ScalarFunc::GeneratedExpr
}

"label_join" => {
// Reserve the current columns
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}

let concat_expr =
Self::build_concat_labels_expr(&mut other_input_exprs, session_state)?;
// Add the new label expr
exprs.push(concat_expr);

ScalarFunc::GeneratedExpr
}
"label_replace" => {
// Reserve the current columns
for value in &self.ctx.field_columns {
let expr = DfExpr::Column(Column::from_name(value));
exprs.push(expr);
}

let replace_expr =
Self::build_regexp_replace_label_expr(&mut other_input_exprs, session_state)?;
// Add the new label expr
exprs.push(replace_expr);

ScalarFunc::GeneratedExpr
}
_ => {
if let Some(f) = session_state.scalar_functions().get(func.name) {
ScalarFunc::DataFusionBuiltin(f.clone())
Expand Down Expand Up @@ -1425,6 +1454,124 @@ impl PromPlanner {
Ok(exprs)
}

/// Build expr for `label_replace` function
fn build_regexp_replace_label_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<DfExpr> {
// label_replace(vector, dst_label, replacement, src_label, regex)
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expect dst_label string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};
let replacement = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expect replacement string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};
let src_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(s)))) => s,
other => UnexpectedPlanExprSnafu {
desc: format!("expect src_label string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};
let regex = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(r)))) => r,
other => UnexpectedPlanExprSnafu {
desc: format!("expect regex string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};

let func = session_state
.scalar_functions()
.get("regexp_replace")
.context(UnsupportedExprSnafu {
name: "regexp_replace",
})?;

// regexp_replace(src_label, regex, replacement)
let args = vec![
DfExpr::Column(Column::from_name(src_label)),
DfExpr::Literal(ScalarValue::Utf8(Some(regex))),
DfExpr::Literal(ScalarValue::Utf8(Some(replacement))),
];

Ok(DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(dst_label))
}

/// Build expr for `label_join` function
fn build_concat_labels_expr(
other_input_exprs: &mut VecDeque<DfExpr>,
session_state: &SessionState,
) -> Result<DfExpr> {
// label_join(vector, dst_label, separator, src_label_1, src_label_2, ...)

let dst_label = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expect dst_label string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};
let separator = match other_input_exprs.pop_front() {
Some(DfExpr::Literal(ScalarValue::Utf8(Some(d)))) => d,
other => UnexpectedPlanExprSnafu {
desc: format!("expect separator string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail()?,
};
let src_labels = other_input_exprs
.clone()
.into_iter()
.map(|expr| {
// Cast source label into column
match expr {
DfExpr::Literal(ScalarValue::Utf8(Some(label))) => {
let expr = DfExpr::Column(Column::from_name(label));
Ok(expr)
}
other => UnexpectedPlanExprSnafu {
desc: format!("expect source label string literal, but found {:?}", other),
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
}
.fail(),
}
})
.collect::<Result<Vec<_>>>()?;
ensure!(
!src_labels.is_empty(),
FunctionInvalidArgumentSnafu {
fn_name: "label_join",
}
);

let func = session_state
.scalar_functions()
.get("concat_ws")
.context(UnsupportedExprSnafu { name: "concat_ws" })?;

// concat_ws(separator, src_label_1, src_label_2, ...) as dst_label
let mut args = Vec::with_capacity(1 + src_labels.len());
args.push(DfExpr::Literal(ScalarValue::Utf8(Some(separator))));
args.extend(src_labels);

Ok(DfExpr::ScalarFunction(ScalarFunction {
func: func.clone(),
args,
})
.alias(dst_label))
}

fn create_time_index_column_expr(&self) -> Result<DfExpr> {
Ok(DfExpr::Column(Column::from_name(
self.ctx
Expand Down Expand Up @@ -3267,4 +3414,74 @@ mod test {
\n TableScan: metrics [tag:Utf8, timestamp:Timestamp(Nanosecond, None), field:Float64;N]"
);
}

#[tokio::test]
async fn test_label_join() {
let prom_expr = parser::parse(
"label_join(up{tag_0='api-server'}, 'foo', ',', 'tag_1', 'tag_2', 'tag_3')",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 4, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, concat_ws(Utf8(","), up.tag_1, up.tag_2, up.tag_3) AS foo AS foo, up.tag_0, up.tag_1, up.tag_2, up.tag_3 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0", "tag_1", "tag_2", "tag_3"] [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.tag_1 DESC NULLS LAST, up.tag_2 DESC NULLS LAST, up.tag_3 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("api-server") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, tag_1:Utf8, tag_2:Utf8, tag_3:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}

#[tokio::test]
async fn test_label_replace() {
let prom_expr = parser::parse(
"label_replace(up{tag_0=\"a:c\"}, \"foo\", \"$1\", \"tag_0\", \"(.*):.*\")",
)
.unwrap();
let eval_stmt = EvalStmt {
expr: prom_expr,
start: UNIX_EPOCH,
end: UNIX_EPOCH
.checked_add(Duration::from_secs(100_000))
.unwrap(),
interval: Duration::from_secs(5),
lookback_delta: Duration::from_secs(1),
};

let table_provider =
build_test_table_provider(&[(DEFAULT_SCHEMA_NAME.to_string(), "up".to_string())], 1, 1)
.await;
let plan = PromPlanner::stmt_to_plan(table_provider, &eval_stmt, &build_session_state())
.await
.unwrap();

let expected = r#"Filter: field_0 IS NOT NULL AND foo IS NOT NULL [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
Projection: up.timestamp, up.field_0 AS field_0, regexp_replace(up.tag_0, Utf8("(.*):.*"), Utf8("$1")) AS foo AS foo, up.tag_0 [timestamp:Timestamp(Millisecond, None), field_0:Float64;N, foo:Utf8;N, tag_0:Utf8]
PromInstantManipulate: range=[0..100000000], lookback=[1000], interval=[5000], time index=[timestamp] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesNormalize: offset=[0], time index=[timestamp], filter NaN: [false] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
PromSeriesDivide: tags=["tag_0"] [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Sort: up.tag_0 DESC NULLS LAST, up.timestamp DESC NULLS LAST [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
Filter: up.tag_0 = Utf8("a:c") AND up.timestamp >= TimestampMillisecond(-1000, None) AND up.timestamp <= TimestampMillisecond(100001000, None) [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]
TableScan: up [tag_0:Utf8, timestamp:Timestamp(Millisecond, None), field_0:Float64;N]"#;

assert_eq!(plan.display_indent_schema().to_string(), expected);
}
}
103 changes: 103 additions & 0 deletions tests/cases/standalone/common/promql/label.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);

Affected Rows: 0

INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2:zone1',3),
(5000, 'host2', 'idc2',4),
(10000, 'host1', 'idc3:zone2',5),
(10000, 'host2', 'idc3',6),
(15000, 'host1', 'idc4:zone3',7),
(15000, 'host2', 'idc4',8);

Affected Rows: 8

-- Missing source labels --
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-");

Error: 1004(InvalidArguments), Invalid function argument for label_join

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host");

+---------------------+-----+------------------+-------+------------+
| ts | val | new_host | host | idc |
+---------------------+-----+------------------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1-host1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | idc2:zone1-host1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | idc3:zone2-host1 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | idc4:zone3-host1 | host1 | idc4:zone3 |
killme2008 marked this conversation as resolved.
Show resolved Hide resolved
+---------------------+-----+------------------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "$2", "idc", "(.*):(.*)");

+---------------------+-----+---------+-------+------------+
| ts | val | new_idc | host | idc |
+---------------------+-----+---------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | zone1 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | zone3 | host1 | idc4:zone3 |
+---------------------+-----+---------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "idc", "idc2.*");

+---------------------+-----+------------+-------+------------+
| ts | val | new_idc | host | idc |
+---------------------+-----+------------+-------+------------+
| 1970-01-01T00:00:00 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:05 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:10 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:10 | 5 | idc3:zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 1 | idc1 | host1 | idc1 |
| 1970-01-01T00:00:15 | 3 | idc99 | host1 | idc2:zone1 |
| 1970-01-01T00:00:15 | 5 | idc3:zone2 | host1 | idc3:zone2 |
| 1970-01-01T00:00:15 | 7 | idc4:zone3 | host1 | idc4:zone3 |
+---------------------+-----+------------+-------+------------+

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)");

+---------------------+-----+---------+-------+------+
| ts | val | new_idc | host | idc |
+---------------------+-----+---------+-------+------+
| 1970-01-01T00:00:00 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:05 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:05 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:10 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:10 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:10 | 6 | idc3 | host2 | idc3 |
| 1970-01-01T00:00:15 | 2 | idc1 | host2 | idc1 |
| 1970-01-01T00:00:15 | 4 | idc2 | host2 | idc2 |
| 1970-01-01T00:00:15 | 6 | idc3 | host2 | idc3 |
| 1970-01-01T00:00:15 | 8 | idc4 | host2 | idc4 |
+---------------------+-----+---------+-------+------+

DROP TABLE test;

Affected Rows: 0

35 changes: 35 additions & 0 deletions tests/cases/standalone/common/promql/label.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
CREATE TABLE test (
ts timestamp(3) time index,
host STRING,
idc STRING,
val BIGINT,
PRIMARY KEY(host, idc),
);

INSERT INTO TABLE test VALUES
(0, 'host1', 'idc1', 1),
(0, 'host2', 'idc1', 2),
(5000, 'host1', 'idc2:zone1',3),
(5000, 'host2', 'idc2',4),
(10000, 'host1', 'idc3:zone2',5),
(10000, 'host2', 'idc3',6),
(15000, 'host1', 'idc4:zone3',7),
(15000, 'host2', 'idc4',8);

-- Missing source labels --
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-");

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_join(test{host="host1"}, "new_host", "-", "idc", "host");

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "$2", "idc", "(.*):(.*)");

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host1"}, "new_idc", "idc99", "idc", "idc2.*");

-- SQLNESS SORT_RESULT 3 1
TQL EVAL (0, 15, '5s') label_replace(test{host="host2"}, "new_idc", "$2", "idc", "(.*):(.*)");


DROP TABLE test;
Loading