Skip to content

Commit

Permalink
fix: OOM in traceql requests
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed May 2, 2024
1 parent f48d607 commit 76635fe
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 20 deletions.
28 changes: 13 additions & 15 deletions traceql/clickhouse_transpiler/attr_condition.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { getCompareFn, durationToNs, unquote } = require('./shared')
const { getCompareFn, durationToNs, unquote, bitSet } = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder {
constructor () {
Expand Down Expand Up @@ -71,6 +71,18 @@ module.exports = class Builder {
const having = self.getCond(self.conds)
self.aggregator(sel)
sel.conditions = Sql.And(sel.conditions, Sql.Or(...self.where))
if (Array.isArray(ctx.randomFilter) && Array.isArray(ctx.cachedTraceIds)) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Or(
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])),
new Sql.In('trace_id', 'in', ctx.cachedTraceIds.map(traceId => new Sql.Raw(`unhex('${traceId}')`)))
))
} else if (Array.isArray(ctx.randomFilter)) {
sel.conditions = Sql.And(
sel.conditions,
Sql.Eq(new Sql.Raw(`cityHash64(trace_id) % ${ctx.randomFilter[0]}`), Sql.val(ctx.randomFilter[1])))
}
sel.having(having)
return sel
}
Expand Down Expand Up @@ -248,20 +260,6 @@ function groupBitOr (left, alias) {
return res
}

/**
*
* @param terms
* @returns {SQLObject}
*/
function bitSet (terms) {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}

/**
*
* @param attr {string}
Expand Down
19 changes: 19 additions & 0 deletions traceql/clickhouse_transpiler/attr_condition_eval.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
const attrCondition = require('./attr_condition')
const {bitSet} = require('./shared')
const Sql = require('@cloki/clickhouse-sql')
module.exports = class Builder extends attrCondition {
build () {
const self = this
const superBuild = super.build()
/** @type {BuiltProcessFn} */
const res = (ctx) => {
const sel = superBuild(ctx)
sel.having_conditions = []
sel.aggregations = [bitSet(self.sqlConditions)]
sel.select_list = [[new Sql.Raw('count()'), 'count']]
sel.order_expressions = []
return sel
}
return res
}
}
23 changes: 22 additions & 1 deletion traceql/clickhouse_transpiler/index.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
const AttrConditionPlanner = require('./attr_condition')
const AttrConditionEvalPlanner = require('./attr_condition_eval')
const InitIndexPlanner = require('./init')
const IndexGroupByPlanner = require('./group_by')
const AggregatorPlanner = require('./aggregator')
Expand All @@ -8,10 +9,17 @@ const TracesDataPlanner = require('./traces_data')
/**
* @param script {Token}
*/
module.exports = (script) => {
module.exports.transpile = (script) => {
return new Planner(script).plan()
}

/**
* @param script {Token}
*/
module.exports.evaluateCmpl = (script) => {
return new Planner(script).planEval()
}

class Planner {
/**
*
Expand Down Expand Up @@ -53,6 +61,19 @@ class Planner {
return res
}

planEval () {
this.check()
this.analyze()
const res = (new AttrConditionEvalPlanner())
.withTerms(this.termIdx)
.withConditions(this.cond)
.withAggregatedAttr(this.aggregatedAttr)
.withMain((new InitIndexPlanner()).build())
.build()

return res
}

check () {
if (this.script.Children('SYNTAX').length > 1) {
throw new Error('more than one selector is not supported')
Expand Down
4 changes: 3 additions & 1 deletion traceql/clickhouse_transpiler/init.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ const { standardBuilder } = require('./shared')
* limit: number,
* isCluster: boolean,
* tracesTable: string,
* tracesDistTable: string
* tracesDistTable: string,
* randomFilter: number[]|undefined,
* cachedTraceIds: string[]|undefined,
* }} Context
*/
/**
Expand Down
14 changes: 14 additions & 0 deletions traceql/clickhouse_transpiler/shared.js
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ module.exports.standardBuilder = (fn) => {
}
}
}

/**
*
* @param terms {SQLObject[]}
* @returns {SQLObject}
*/
module.exports.bitSet = (terms) => {
const res = new Sql.Raw('')
res.terms = terms
res.toString = () => {
return res.terms.map((t, i) => `bitShiftLeft(toUInt64(${t.toString()}), ${i})`).join('+')
}
return res
}
83 changes: 80 additions & 3 deletions traceql/index.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
const parser = require('./parser')
const transpiler = require('./clickhouse_transpiler')
const { transpile, evaluateCmpl } = require('./clickhouse_transpiler')
const logger = require('../lib/logger')
const { DATABASE_NAME } = require('../lib/utils')
const { clusterName } = require('../common')
Expand All @@ -23,10 +23,87 @@ const search = async (query, limit, from, to) => {
tracesAttrsTable: `${_dbname}.tempo_traces_attrs_gin`,
from: from,
to: to,
limit: limit
limit: limit,
randomFilter: null
}
const scrpit = parser.ParseScript(query)
const planner = transpiler(scrpit.rootToken)
const complexity = await evaluateComplexity(ctx, scrpit.rootToken)
if (complexity > 10000000) {
return await processComplexResult(ctx, scrpit.rootToken, complexity)
}
return await processSmallResult(ctx, scrpit.rootToken)
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
const evaluateComplexity = async (ctx, script) => {
const evaluator = evaluateCmpl(script)
const sql = evaluator(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
return response.data.data.reduce((acc, row) => Math.max(acc, row.count), 0)
}

/**
*
* @param ctx {Context}
* @param script {Token}
* @param complexity {number}
*/
async function processComplexResult (ctx, script, complexity) {
const planner = transpile(script)
const maxFilter = Math.floor(complexity / 10000000)
let traces = []
for (let i = 0; i < maxFilter; i++) {
ctx.randomFilter = [maxFilter, i]
let sql = planner(ctx)
let response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
if (response.data.data.length === parseInt(ctx.limit)) {
const minStart = response.data.data.reduce((acc, row) =>
acc === 0 ? row.start_time_unix_nano : Math.min(acc, row.start_time_unix_nano), 0
)
ctx.from = new Date(Math.floor(minStart / 1000000))
ctx.randomFilter = null
complexity = await evaluateComplexity(ctx, script)
if (complexity <= 10000000) {
return await processSmallResult(ctx, script)
}
ctx.randomFilter = [maxFilter, i]
}
ctx.cachedTraceIds = response.data.data.map(row => row.trace_id)
sql = planner(ctx)
response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
traces = response.data.data.map(row => ({
traceID: row.trace_id,
rootServiceName: row.root_service_name,
rootTraceName: row.root_trace_name,
startTimeUnixNano: row.start_time_unix_nano,
durationMs: row.duration_ms,
spanSets: [
{
spans: row.span_id.map((spanId, i) => ({
spanID: spanId,
startTimeUnixNano: row.timestamp_ns[i],
durationNanos: row.duration[i],
attributes: []
})),
matched: row.span_id.length
}
]
}))
}
return traces
}

/**
*
* @param ctx {Context}
* @param script {Token}
*/
async function processSmallResult (ctx, script) {
const planner = transpile(script)
const sql = planner(ctx)
const response = await rawRequest(sql + ' FORMAT JSON', null, DATABASE_NAME())
const traces = response.data.data.map(row => ({
Expand Down

0 comments on commit 76635fe

Please sign in to comment.