From e5a3013007f8aa8f8723c213bc4dfaaa051b5e56 Mon Sep 17 00:00:00 2001 From: akvlad Date: Wed, 12 Jun 2024 13:20:12 +0300 Subject: [PATCH] init --- lib/db/throttler.js | 68 ++++++++++++++++++++++++++++++++------------- 1 file changed, 48 insertions(+), 20 deletions(-) diff --git a/lib/db/throttler.js b/lib/db/throttler.js index 6e50db85..59104cbe 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -35,16 +35,42 @@ const axiosError = async (err) => { } } -class TimeoutThrottler { - constructor (statement) { +class TimeoutOrSizeThrottler { + constructor (statement, maxSizeB, maxAgeMS) { this.statement = statement this.queue = [] this.resolvers = [] this.rejects = [] + this.size = 0 + + this.maxSizeB = maxSizeB + this.maxAgeMs = maxAgeMS + this.lastRequest = 0 + } + + /** + * @param message {string} + */ + queuePush (message) { + this.queue.push(message) + this.size += message.length + } + + willFlush () { + return (this.maxSizeB && this.size > this.maxSizeB) || + (this.maxAgeMs && Date.now() - this.lastRequest > this.maxAgeMs) } - async flush () { + /** + * @param force {boolean} + * @returns {Promise} + */ + async flush (force) { try { + if (!force && !this.willFlush()) { + return + } + this.lastRequest = Date.now() await this._flush() this.resolvers.forEach(r => r()) } catch (err) { @@ -70,12 +96,11 @@ class TimeoutThrottler { } } - const emitter = new EventEmitter() let on = true const postMessage = message => { const genericRequest = (throttler) => { - throttler.queue.push(message.data) + throttler.queuePush(message.data) throttler.resolvers.push(() => { if (isMainThread) { emitter.emit('message', { status: 'ok', id: message.id }) @@ -115,30 +140,33 @@ const init = () => { require('./clickhouse').rawRequest ] - samplesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`) - timeSeriesThrottler = new TimeoutThrottler( - `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`) - tracesThottler = new TimeoutThrottler( + samplesThrottler = new TimeoutOrSizeThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) + timeSeriesThrottler = new TimeoutOrSizeThrottler( + `INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) + tracesThottler = new TimeoutOrSizeThrottler( `INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input (trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags) - FORMAT JSONEachRow`) + FORMAT JSONEachRow`, + parseInt(process.env.BULK_MAX_SIZE_BYTES || 0), parseInt(process.env.BULK_MAX_AGE_MS || 100)) setTimeout(async () => { // eslint-disable-next-line no-unmodified-loop-condition while (on) { - const ts = Date.now() try { - await timeSeriesThrottler.flush() - await samplesThrottler.flush() - await tracesThottler.flush() + await Promise.all([ + (async () => { + await timeSeriesThrottler.flush(samplesThrottler.willFlush()) + await samplesThrottler.flush(false) + })(), + tracesThottler.flush(false) + ]) } catch (err) { logger.error(await axiosError(err), 'AXIOS ERROR') } - const p = Date.now() - ts - if (p < 100) { - await new Promise((resolve) => setTimeout(resolve, 100 - p)) - } + await new Promise((resolve) => setTimeout(resolve, 100)) } }, 0) } @@ -148,7 +176,7 @@ if (isMainThread) { samplesThrottler, timeSeriesThrottler, tracesThottler, - TimeoutThrottler, + TimeoutThrottler: TimeoutOrSizeThrottler, postMessage, on: emitter.on.bind(emitter), removeAllListeners: emitter.removeAllListeners.bind(emitter),