Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: throttler -> clickhouse.js cross dependency #483

Merged
merged 1 commit into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 20 additions & 18 deletions lib/db/clickhouse.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,25 +89,27 @@ if (isMainThread && !bun()) {
})
} else if (isMainThread && !first) {
first = true
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
setTimeout(() => {
const _throttler = require('./throttler')
throttler = {
on: _throttler.on,
postMessage: _throttler.postMessage,
removeAllListeners: _throttler.removeAllListeners,
terminate: _throttler.terminate
}
delete resolvers[msg.id]
delete rejectors[msg.id]
_throttler.init()
throttler.on('message', (msg) => {
switch (msg.status) {
case 'ok':
resolvers[msg.id]()
break
case 'err':
rejectors[msg.id](new Error('Database push error'))
break
}
delete resolvers[msg.id]
delete rejectors[msg.id]
})
})
}
// timeSeriesv2Throttler.start();
Expand Down
33 changes: 23 additions & 10 deletions lib/db/throttler.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
const { isMainThread, parentPort } = require('worker_threads')
const axios = require('axios')
const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse')
const clickhouseOptions = require('./clickhouse_options').databaseOptions
const logger = require('../logger')
const { DATABASE_NAME } = require('../utils')
const clusterName = require('../../common').clusterName
const dist = clusterName ? '_dist' : ''
const { EventEmitter } = require('events')

// variables to be initialized in the init() function due to the './clickhouse.js' cross-dependency & bun
let samplesThrottler
let timeSeriesThrottler
let tracesThottler
let samplesTableName
let rawRequest

const axiosError = async (err) => {
console.log('axiosError', err)
try {
const resp = err.response
if (resp) {
Expand All @@ -24,6 +30,7 @@ const axiosError = async (err) => {
(err.responseData ? ' Response data: ' + err.responseData : ''))
}
} catch (e) {
console.log(e)
return err
}
}
Expand Down Expand Up @@ -63,14 +70,6 @@ class TimeoutThrottler {
}
}

const samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
const timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
const tracesThottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)

const emitter = new EventEmitter()
let on = true
Expand Down Expand Up @@ -111,6 +110,20 @@ const postMessage = message => {
}

const init = () => {
[samplesTableName, rawRequest] = [
require('./clickhouse').samplesTableName,
require('./clickhouse').rawRequest
]

samplesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.${samplesTableName}${dist}(fingerprint, timestamp_ns, value, string, type) FORMAT JSONEachRow`)
timeSeriesThrottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.time_series${dist}(date, fingerprint, labels, name, type) FORMAT JSONEachRow`)
tracesThottler = new TimeoutThrottler(
`INSERT INTO ${clickhouseOptions.queryOptions.database}.traces_input
(trace_id, span_id, parent_id, name, timestamp_ns, duration_ns, service_name, payload_type, payload, tags)
FORMAT JSONEachRow`)

setTimeout(async () => {
// eslint-disable-next-line no-unmodified-loop-condition
while (on) {
Expand Down
Loading