Skip to content

Commit

Permalink
init
Browse files Browse the repository at this point in the history
  • Loading branch information
akvlad committed Jun 12, 2024
1 parent 2944762 commit e5a3013
Showing 1 changed file with 48 additions and 20 deletions.
68 changes: 48 additions & 20 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,42 @@ const axiosError = async (err) => {
}
}

class TimeoutThrottler {
constructor (statement) {
class TimeoutOrSizeThrottler {
constructor (statement, maxSizeB, maxAgeMS) {
this.statement = statement
this.queue = []
this.resolvers = []
this.rejects = []
this.size = 0

this.maxSizeB = maxSizeB
this.maxAgeMs = maxAgeMS
this.lastRequest = 0
}

/**
* @param message {string}
*/
queuePush (message) {
this.queue.push(message)
this.size += message.length
}

willFlush () {
return (this.maxSizeB && this.size > this.maxSizeB) ||
(this.maxAgeMs && Date.now() - this.lastRequest > this.maxAgeMs)
}

async flush () {
/**
* @param force {boolean}
* @returns {Promise<void>}
*/
async flush (force) {
try {
if (!force && !this.willFlush()) {
return
}
this.lastRequest = Date.now()
await this._flush()
this.resolvers.forEach(r => r())
} catch (err) {
Expand All @@ -70,12 +96,11 @@ class TimeoutThrottler {
}
}


const emitter = new EventEmitter()
let on = true
const postMessage = message => {
const genericRequest = (throttler) => {
throttler.queue.push(message.data)
throttler.queuePush(message.data)
throttler.resolvers.push(() => {
if (isMainThread) {
emitter.emit('message', { status: 'ok', id: message.id })
Expand Down Expand Up @@ -115,30 +140,33 @@ const init = () => {
require('./clickhouse').rawRequest
]

samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
tracesThottler = new TimeoutThrottler(
samplesThrottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))
timeSeriesThrottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))
tracesThottler = new TimeoutOrSizeThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)
FORMAT JSONEachRow`,
parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100))

setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
const ts = Date.now()
try {
await timeSeriesThrottler.flush()
await samplesThrottler.flush()
await tracesThottler.flush()
await Promise.all([
(async () => {
await timeSeriesThrottler.flush(samplesThrottler.willFlush())
await samplesThrottler.flush(false)
})(),
tracesThottler.flush(false)
])
} catch (err) {
logger.error(await axiosError(err), 'AXIOS ERROR')
}
const p = Date.now() - ts
if (p < 100) {
await new Promise((resolve) => setTimeout(resolve, 100 - p))
}
await new Promise((resolve) => setTimeout(resolve, 100))
}
}, 0)
}
Expand All @@ -148,7 +176,7 @@ if (isMainThread) {
samplesThrottler,
timeSeriesThrottler,
tracesThottler,
TimeoutThrottler,
TimeoutThrottler: TimeoutOrSizeThrottler,
postMessage,
on: emitter.on.bind(emitter),
removeAllListeners: emitter.removeAllListeners.bind(emitter),
Expand Down

0 comments on commit e5a3013

Please sign in to comment.