Skip to content

Commit

Permalink
Merge pull request #483 from metrico/bun_fix
Browse files Browse the repository at this point in the history
fix: throttler -> clickhouse.js cross dependency
  • Loading branch information
akvlad authored Mar 21, 2024
2 parents 787a9f9 + 10b8f62 commit 3e04bde
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 28 deletions.
38 changes: 20 additions & 18 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,27 @@ if (isMainThread && !bun()) {
})
} else if (isMainThread && !first) {
first = true
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
setTimeout(() => {
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
delete resolvers[msg.id]
delete rejectors[msg.id]
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
}
delete resolvers[msg.id]
delete rejectors[msg.id]
})
})
}
// timeSeriesv2Throttler.start();
Expand Down
33 changes: 23 additions & 10 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
const { isMainThread, parentPort } = require('worker_threads')
const axios = require('axios')
const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse')
const clickhouseOptions = require('./clickhouse_options').databaseOptions
const logger = require('../logger')
const { DATABASE_NAME } = require('../utils')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''
const { EventEmitter } = require('events')

// variables to be initialized in the init() function due to the './clickhouse.js' cross-dependency & bun
let samplesThrottler
let timeSeriesThrottler
let tracesThottler
let samplesTableName
let rawRequest

const axiosError = async (err) => {
console.log('axiosError', err)
try {
const resp = err.response
if (resp) {
Expand All @@ -24,6 +30,7 @@ const axiosError = async (err) => {
(err.responseData ? ' Response data: ' + err.responseData : ''))
}
} catch (e) {
console.log(e)
return err
}
}
Expand Down Expand Up @@ -63,14 +70,6 @@ class TimeoutThrottler {
}
}

const samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
const timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
const tracesThottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)

const emitter = new EventEmitter()
let on = true
Expand Down Expand Up @@ -111,6 +110,20 @@ const postMessage = message => {
}

const init = () => {
[samplesTableName, rawRequest] = [
require('./clickhouse').samplesTableName,
require('./clickhouse').rawRequest
]

samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
tracesThottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)

setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
Expand Down

0 comments on commit 3e04bde

Please sign in to comment.