Skip to content

Commit

Permalink
Merge pull request #550 from metrico/feature/grafana_profiles_plugin
Browse files Browse the repository at this point in the history
drop debug logs; fix memory leaks
  • Loading branch information
akvlad authored Aug 26, 2024
2 parents 3fe7915 + 28cf4c1 commit 87a42e6
Showing 1 changed file with 94 additions and 89 deletions.
183 changes: 94 additions & 89 deletions pyroscope/pyroscope.js
Original file line number Diff line number Diff line change
Expand Up @@ -121,101 +121,106 @@ const selectSeries = async (req, res) => {
}

const selectMergeProfile = async (req, res) => {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
// const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()
const ctx = newCtxIdx()
try {
const _req = req.body
const fromTimeSec = Math.floor(req.getStart && req.getStart()
? parseInt(req.getStart()) / 1000
: Date.now() / 1000 - HISTORY_TIMESPAN)
const toTimeSec = Math.floor(req.getEnd && req.getEnd()
? parseInt(req.getEnd()) / 1000
: Date.now() / 1000)
let typeID = _req.getProfileTypeid && _req.getProfileTypeid()
if (!typeID) {
throw new QrynBadRequest('No type provided')
}
typeID = parseTypeId(typeID)
if (!typeID) {
throw new QrynBadRequest('Invalid type provided')
}
const dist = clusterName ? '_dist' : ''
// const sampleTypeId = typeID.sampleType + ':' + typeID.sampleUnit
const labelSelector = _req.getLabelSelector && _req.getLabelSelector()

const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)
const typeIdSelector = Sql.Eq(
'type_id',
Sql.val(`${typeID.type}:${typeID.periodType}:${typeID.periodUnit}`))
const serviceNameSelector = serviceNameSelectorQuery(labelSelector)

const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(
new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`
),
1
const idxReq = (new Sql.Select())
.select(new Sql.Raw('fingerprint'))
.from(`${DATABASE_NAME()}.profiles_series_gin`)
.where(
Sql.And(
typeIdSelector,
serviceNameSelector,
Sql.Gte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(fromTimeSec)}))`)),
Sql.Lte('date', new Sql.Raw(`toDate(FROM_UNIXTIME(${Math.floor(toTimeSec)}))`)),
Sql.Eq(
new Sql.Raw(
`has(sample_types_units, (${Sql.quoteVal(typeID.sampleType)}, ${Sql.quoteVal(typeID.sampleUnit)}))`
),
1
)
)
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
[new Sql.Raw('count()'), 'count']
)
.from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main'])
const approx = await clickhouse.rawRequest(
approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()
)
labelSelectorQuery(idxReq, labelSelector)
const withIdxReq = (new Sql.With('idx', idxReq, !!clusterName))
const mainReq = (new Sql.Select())
.with(withIdxReq)
.select([new Sql.Raw('payload'), 'payload'])
.from([`${DATABASE_NAME()}.profiles${dist}`, 'p'])
.where(Sql.And(
new Sql.In('p.fingerprint', 'IN', new Sql.WithReference(withIdxReq)),
Sql.Gte('p.timestamp_ns', new Sql.Raw(`${fromTimeSec}000000000`)),
Sql.Lt('p.timestamp_ns', new Sql.Raw(`${toTimeSec}000000000`))))
.orderBy(new Sql.Raw('timestamp_ns'))
const approxReq = (new Sql.Select())
.select(
[new Sql.Raw('sum(length(payload))'), 'size'],
[new Sql.Raw('count()'), 'count']
)
.from([new Sql.Raw('(' + mainReq.toString() + ')'), 'main'])
console.log('!!!!!' + approxReq.toString() + ' FORMAT JSON')
const approx = await clickhouse.rawRequest(
approxReq.toString() + ' FORMAT JSON', null, DATABASE_NAME()
)
const approxData = approx.data.data[0]
logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`)
const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024 * 1024)), 1)
logger.debug(`Request is processed in: ${chunksCount} chunks`)
const chunkSize = Math.ceil(approxData.count / chunksCount)
const promises = []
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
let processNs = BigInt(0)
const start = process.hrtime.bigint()
const ctx = newCtxIdx()
for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
logger.debug(`Chunk ${i}: ${mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
const start = process.hrtime.bigint()
pprofBin.merge_trees_pprof(ctx, binData)
const end = process.hrtime.bigint()
processNs += end - start
})(i))
}
await Promise.all(promises)
const response = pprofBin.export_trees_pprof(ctx)
const end = process.hrtime.bigint()
const approxData = approx.data.data[0]
logger.debug(`Approximate size: ${approxData.size} bytes, profiles count: ${approxData.count}`)
const chunksCount = Math.max(Math.ceil(approxData.size / (50 * 1024 * 1024)), 1)
logger.debug(`Request is processed in: ${chunksCount} chunks`)
const chunkSize = Math.ceil(approxData.count / chunksCount)
const promises = []
require('./pprof-bin/pkg/pprof_bin').init_panic_hook()
let processNs = BigInt(0)
const start = process.hrtime.bigint()

for (let i = 0; i < chunksCount; i++) {
promises.push((async (i) => {
logger.debug(`Processing chunk ${i}`)
const profiles = await clickhouse.rawRequest(mainReq.toString() + ` LIMIT ${chunkSize} OFFSET ${i * chunkSize} FORMAT RowBinary`,
null,
DATABASE_NAME(),
{
responseType: 'arraybuffer'
})
const binData = Uint8Array.from(profiles.data)
logger.debug(`Chunk ${i} - ${binData.length} bytes`)
const start = process.hrtime.bigint()
pprofBin.merge_trees_pprof(ctx, binData)
const end = process.hrtime.bigint()
processNs += end - start
})(i))
}
await Promise.all(promises)
const response = pprofBin.export_trees_pprof(ctx)
const end = process.hrtime.bigint()

logger.debug(`Pprof merge took ${processNs} nanoseconds`)
logger.debug(`Pprof load + merge took ${end - start} nanoseconds`)
return res.code(200).send(Buffer.from(response))
logger.debug(`Pprof merge took ${processNs} nanoseconds`)
logger.debug(`Pprof load + merge took ${end - start} nanoseconds`)
return res.code(200).send(Buffer.from(response))
} finally {
pprofBin.drop_tree(ctx)
}
}

const series = async (req, res) => {
Expand Down

0 comments on commit 87a42e6

Please sign in to comment.