diff --git a/lib/db/clickhouse.js b/lib/db/clickhouse.js index 008b70e4..f98bc4b8 100644 --- a/lib/db/clickhouse.js +++ b/lib/db/clickhouse.js @@ -1319,7 +1319,7 @@ const samplesReadTable = { } logger.info('checking last timestamp') const v1EndTime = await axios.post(`${getClickhouseUrl()}/?database=${UTILS.DATABASE_NAME()}`, - `SELECT max(timestamp_ns) as ts FROM ${tableName} format JSON`) + `SELECT max(timestamp_ns) as ts FROM ${UTILS.DATABASE_NAME()}.${tableName} format JSON`) if (!v1EndTime.data.rows) { samplesReadTable.v1 = false return @@ -1339,7 +1339,7 @@ const samplesReadTable = { settingsVersions: async function () { const versions = await rawRequest( `SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, + FROM ${UTILS.DATABASE_NAME()}.settings${dist} WHERE type == 'update' GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, null, UTILS.DATABASE_NAME() ) @@ -1385,7 +1385,7 @@ const getSettings = async (names, database) => { 'short-hash')) const settings = await rawRequest(`SELECT argMax(name, inserted_at) as _name, argMax(value, inserted_at) as _value - FROM settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, + FROM ${database}.settings${dist} WHERE fingerprint IN (${fps.join(',')}) GROUP BY fingerprint HAVING _name != '' FORMAT JSON`, null, database) return settings.data.data.reduce((sum, cur) => { sum[cur._name] = cur._value @@ -1403,7 +1403,7 @@ const getSettings = async (names, database) => { */ const addSetting = async (type, name, value, database) => { const fp = UTILS.fingerPrint(JSON.stringify({ type: type, name: name }), false, 'short-hash') - return rawRequest('INSERT INTO settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow', + return rawRequest(`INSERT INTO ${UTILS.DATABASE_NAME()}.settings (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`, JSON.stringify({ fingerprint: fp, type: type, diff --git a/lib/db/clickhouse_alerting.js b/lib/db/clickhouse_alerting.js index 5c99d642..f2b0765a 100644 --- a/lib/db/clickhouse_alerting.js +++ b/lib/db/clickhouse_alerting.js @@ -5,7 +5,7 @@ const axios = require('axios') const { DATABASE_NAME } = require('../utils') const UTILS = require('../utils') -const { getClickhouseUrl } = require('./clickhouse') +const { getClickhouseUrl, rawRequest } = require('./clickhouse') const Sql = require('@cloki/clickhouse-sql') const { clusterName } = require('../../common') const onCluster = clusterName ? `ON CLUSTER ${clusterName}` : '' @@ -19,14 +19,13 @@ const dist = clusterName ? '_dist' : '' module.exports.getAlertRule = async (ns, group, name) => { const fp = getRuleFP(ns, group, name) const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = await rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE fingerprint = ${fp} AND ${mark} == ${mark} ` + 'GROUP BY fingerprint ' + 'HAVING name != \'\' ' + - 'FORMAT JSON' - ) + 'FORMAT JSON', null, DATABASE_NAME()) if (!res.data.data.length) { return undefined } @@ -49,10 +48,11 @@ module.exports.putAlertRule = async (namespace, group, rule) => { const groupName = JSON.stringify({ type: 'alert_group', ns: namespace, group: group.name }) const groupFp = getGroupFp(namespace, group.name) const groupVal = JSON.stringify({ name: group.name, interval: group.interval }) - await axios.post(getClickhouseUrl(), - `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow \n` + + await rawRequest( + `INSERT INTO ${DATABASE_NAME()}.settings${dist} (fingerprint, type, name, value, inserted_at) FORMAT JSONEachRow`, JSON.stringify({ fingerprint: ruleFp, type: 'alert_rule', name: ruleName, value: JSON.stringify(ruleVal), inserted_at: Date.now() * 1000000 }) + '\n' + - JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }) + JSON.stringify({ fingerprint: groupFp, type: 'alert_group', name: groupName, value: groupVal, inserted_at: Date.now() * 1000000 }), + DATABASE_NAME() ) } @@ -66,8 +66,9 @@ module.exports.putAlertRule = async (namespace, group, rule) => { module.exports.getLastCheck = async (ns, group, rule, id) => { const fp = getRuleFP(ns, group, rule) id = id || 0 - const resp = await axios.post(getClickhouseUrl(), - `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON` + const resp = await rawRequest( + `SELECT max(mark) as maxmark FROM ${DATABASE_NAME()}._alert_view_${fp}_mark WHERE id = ${id} FORMAT JSON`, + null, DATABASE_NAME() ) if (!resp.data.data[0]) { return 0 @@ -100,11 +101,12 @@ module.exports.getAlertRules = async (limit, offset) => { const _limit = limit ? `LIMIT ${limit}` : '' const _offset = offset ? `OFFSET ${offset}` : '' const mark = Math.random() - const res = await axios.post(getClickhouseUrl(), + const res = await rawRequest( 'SELECT fingerprint, argMax(name, inserted_at) as name, argMax(value, inserted_at) as value ' + `FROM ${DATABASE_NAME()}.settings${dist} ` + `WHERE type == 'alert_rule' AND ${mark} == ${mark} ` + - `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`) + `GROUP BY fingerprint HAVING name != '' ORDER BY name ${_limit} ${_offset} FORMAT JSON`, + null, DATABASE_NAME()) return res.data.data.map(e => { return { rule: JSON.parse(e.value), name: JSON.parse(e.name) } }) diff --git a/lib/db/maintain/index.js b/lib/db/maintain/index.js index c1c88faa..b66aaabd 100644 --- a/lib/db/maintain/index.js +++ b/lib/db/maintain/index.js @@ -110,15 +110,15 @@ module.exports.rotate = async (opts) => { return upgradeRequest({ db: db.db, useDefaultDB: true }, req) } if (db.samples_days + '' !== settings.v3_samples_days) { - const alterTable = 'ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + const alterTable = 'ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v3_samples_days', db.samples_days + '', db.db) } if (db.time_series_days + '' !== settings.v3_time_series_days) { - const alterTable = 'ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - const rotateTable = `ALTER TABLE time_series {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` + const alterTable = 'ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + const rotateTable = `ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY TTL "date" + INTERVAL ${db.time_series_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) const alterView = 'ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' @@ -129,54 +129,54 @@ module.exports.rotate = async (opts) => { } if (db.storage_policy && db.storage_policy !== settings.v3_storage_policy) { logger.debug(`Altering storage policy: ${db.storage_policy}`) - const alterTs = `ALTER TABLE time_series {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterTsVw = `ALTER TABLE time_series_gin {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterSm = `ALTER TABLE samples_v3 {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTs = `ALTER TABLE {{DB}}.time_series {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTsVw = `ALTER TABLE {{DB}}.time_series_gin {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterSm = `ALTER TABLE {{DB}}.samples_v3 {{{OnCluster}}} MODIFY SETTING storage_policy='${db.storage_policy}'` await _update(alterTs, null, db.db) await _update(alterTsVw, null, db.db) await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v3_storage_policy', db.storage_policy, db.db) } if (db.samples_days + '' !== settings.v1_traces_days) { - let alterTable = 'ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - let rotateTable = `ALTER TABLE tempo_traces {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + let alterTable = 'ALTER TABLE {{DB}}.tempo_traces {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE {{DB}}.tempo_traces {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_attrs_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.tempo_traces_attrs_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE tempo_traces_kv {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.tempo_traces_kv {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.tempo_traces_kv {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v1_traces_days', db.samples_days + '', db.db) } if (db.storage_policy && db.storage_policy !== settings.v1_traces_storage_policy) { logger.debug(`Altering storage policy: ${db.storage_policy}`) - const alterTs = `ALTER TABLE tempo_traces MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterTsVw = `ALTER TABLE tempo_traces_attrs_gin MODIFY SETTING storage_policy='${db.storage_policy}'` - const alterSm = `ALTER TABLE tempo_traces_kv MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTs = `ALTER TABLE {{DB}}.tempo_traces MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterTsVw = `ALTER TABLE {{DB}}.tempo_traces_attrs_gin MODIFY SETTING storage_policy='${db.storage_policy}'` + const alterSm = `ALTER TABLE {{DB}}.tempo_traces_kv MODIFY SETTING storage_policy='${db.storage_policy}'` await _update(alterTs, null, db.db) await _update(alterTsVw, null, db.db) await _update(alterSm, null, db.db) await client.addSetting('rotate', 'v1_traces_storage_policy', db.storage_policy, db.db) } if (db.samples_days + '' !== settings.v1_profiles_days) { - let alterTable = 'ALTER TABLE profiles {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - let rotateTable = `ALTER TABLE profiles {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` + let alterTable = 'ALTER TABLE {{DB}}.profiles {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + let rotateTable = `ALTER TABLE {{DB}}.profiles {{{OnCluster}}} MODIFY TTL toDateTime(timestamp_ns / 1000000000) + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series_gin {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series_gin {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) - alterTable = 'ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' - rotateTable = `ALTER TABLE profiles_series_keys {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` + alterTable = 'ALTER TABLE {{DB}}.profiles_series_keys {{{OnCluster}}} MODIFY SETTING ttl_only_drop_parts = 1, merge_with_ttl_timeout = 3600, index_granularity = 8192' + rotateTable = `ALTER TABLE {{DB}}.profiles_series_keys {{{OnCluster}}} MODIFY TTL date + INTERVAL ${db.samples_days} DAY` await _update(alterTable, null, db.db) await _update(rotateTable, null, db.db) await client.addSetting('rotate', 'v1_profiles_days', db.samples_days + '', db.db) diff --git a/lib/db/maintain/scripts.js b/lib/db/maintain/scripts.js index 7c2aa097..8baaf313 100644 --- a/lib/db/maintain/scripts.js +++ b/lib/db/maintain/scripts.js @@ -40,8 +40,8 @@ module.exports.overall = [ AS SELECT date, pairs.1 as key, pairs.2 as val, fingerprint FROM time_series ARRAY JOIN JSONExtractKeysAndValues(time_series.labels, 'String') as pairs`, - "INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', " + - "'v3_1', toString(toUnixTimestamp(NOW())), NOW())", + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_5'), 'update', + 'v3_1', toString(toUnixTimestamp(NOW())), NOW())`, `CREATE TABLE IF NOT EXISTS {{DB}}.metrics_15s {{{OnCluster}}} ( fingerprint UInt64, @@ -65,10 +65,12 @@ SELECT fingerprint, countState() as count, sumSimpleState(value) as sum, sumSimpleState(length(string)) as bytes -FROM samples_v3 as samples +FROM {{DB}}.samples_v3 as samples GROUP BY fingerprint, timestamp_ns;`, - "INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', " + + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', + 'v3_2', toString(toUnixTimestamp(NOW())), NOW())`, + "INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('update_v3_2'), 'update', " + "'v3_2', toString(toUnixTimestamp(NOW())), NOW())", `ALTER TABLE {{DB}}.time_series {{{OnCluster}}} @@ -191,8 +193,8 @@ module.exports.traces = [ duration_ns as duration FROM traces_input ARRAY JOIN tags`, - "INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', " + - "'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())" + `INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('tempo_traces_v1'), 'update', + 'tempo_traces_v2', toString(toUnixTimestamp(NOW())), NOW())` ] module.exports.overall_dist = [ @@ -246,7 +248,7 @@ module.exports.overall_dist = [ ] module.exports.traces_dist = [ - `CREATE TABLE IF NOT EXISTS tempo_traces_kv_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_kv_dist {{{OnCluster}}} ( oid String, date Date, key String, @@ -254,7 +256,7 @@ module.exports.traces_dist = [ val String ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces_kv', sipHash64(oid, key));`, - `CREATE TABLE IF NOT EXISTS tempo_traces_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_dist {{{OnCluster}}} ( oid String, trace_id FixedString(16), span_id FixedString(8), @@ -267,7 +269,7 @@ module.exports.traces_dist = [ payload String, ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}', 'tempo_traces', sipHash64(oid, trace_id));`, - `CREATE TABLE IF NOT EXISTS tempo_traces_attrs_gin_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.tempo_traces_attrs_gin_dist {{{OnCluster}}} ( oid String, date Date, key String, @@ -280,7 +282,7 @@ module.exports.traces_dist = [ ] module.exports.profiles = [ - `CREATE TABLE IF NOT EXISTS profiles_input {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_input {{{OnCluster}}} ( timestamp_ns UInt64, type LowCardinality(String), service_name LowCardinality(String), @@ -294,7 +296,7 @@ module.exports.profiles = [ values_agg Array(Tuple(String, Int64, Int32)) CODEC(ZSTD(1)) ) Engine=Null`, - `CREATE TABLE IF NOT EXISTS profiles {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles {{{OnCluster}}} ( timestamp_ns UInt64 CODEC(DoubleDelta, ZSTD(1)), fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)), type_id LowCardinality(String) CODEC(ZSTD(1)), @@ -308,7 +310,7 @@ module.exports.profiles = [ ORDER BY (type_id, service_name, timestamp_ns) PARTITION BY toDate(FROM_UNIXTIME(intDiv(timestamp_ns, 1000000000))) {{{CREATE_SETTINGS}}}`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.profiles_mv {{{OnCluster}}} TO profiles AS SELECT timestamp_ns, cityHash64(arraySort(arrayConcat( @@ -326,7 +328,7 @@ module.exports.profiles = [ values_agg FROM profiles_input`, - `CREATE TABLE IF NOT EXISTS profiles_series {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series {{{OnCluster}}} ( date Date CODEC(ZSTD(1)), type_id LowCardinality(String) CODEC(ZSTD(1)), sample_types_units Array(Tuple(String, String)) CODEC(ZSTD(1)), @@ -337,7 +339,7 @@ module.exports.profiles = [ ORDER BY (date, type_id, fingerprint) PARTITION BY date {{{CREATE_SETTINGS}}}`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_mv {{{OnCluster}}} TO profiles_series AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.profiles_series_mv {{{OnCluster}}} TO profiles_series AS SELECT toDate(intDiv(timestamp_ns, 1000000000)) as date, concatWithSeparator(':', type, period_type, period_unit) as type_id, @@ -352,7 +354,7 @@ module.exports.profiles = [ arrayConcat(profiles_input.tags, [('service_name', service_name)]) as tags FROM profiles_input`, - `CREATE TABLE IF NOT EXISTS profiles_series_gin {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin {{{OnCluster}}} ( date Date CODEC(ZSTD(1)), key String CODEC(ZSTD(1)), val String CODEC(ZSTD(1)), @@ -364,7 +366,7 @@ module.exports.profiles = [ ORDER BY (date, key, val, type_id, fingerprint) PARTITION BY date {{{CREATE_SETTINGS}}}`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_gin_mv {{{OnCluster}}} TO profiles_series_gin AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.profiles_series_gin_mv {{{OnCluster}}} TO profiles_series_gin AS SELECT date, kv.1 as key, @@ -375,7 +377,7 @@ module.exports.profiles = [ fingerprint FROM profiles_series ARRAY JOIN tags as kv`, - `CREATE TABLE IF NOT EXISTS profiles_series_keys {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys {{{OnCluster}}} ( date Date, key String, val String, @@ -384,7 +386,7 @@ module.exports.profiles = [ ORDER BY (date, key, val_id) PARTITION BY date {{{CREATE_SETTINGS}}}`, - `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_series_keys_mv {{{OnCluster}}} TO profiles_series_keys AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.profiles_series_keys_mv {{{OnCluster}}} TO profiles_series_keys AS SELECT date, key, @@ -392,17 +394,17 @@ module.exports.profiles = [ cityHash64(val) % 50000 as val_id FROM profiles_series_gin`, - `ALTER TABLE profiles_input {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles_input {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, - `ALTER TABLE profiles {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, - 'RENAME TABLE IF EXISTS profiles_mv TO profiles_mv_bak {{{OnCluster}}}', + 'RENAME TABLE IF EXISTS {{DB}}.profiles_mv TO profiles_mv_bak {{{OnCluster}}}', - `CREATE MATERIALIZED VIEW IF NOT EXISTS profiles_mv {{{OnCluster}}} TO profiles AS + `CREATE MATERIALIZED VIEW IF NOT EXISTS {{DB}}.profiles_mv {{{OnCluster}}} TO profiles AS SELECT timestamp_ns, cityHash64(arraySort(arrayConcat( @@ -422,14 +424,14 @@ module.exports.profiles = [ functions FROM profiles_input`, - 'DROP TABLE IF EXISTS profiles_mv_bak {{{OnCluster}}}', + 'DROP TABLE IF EXISTS {{DB}}.profiles_mv_bak {{{OnCluster}}}', - "INSERT INTO settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('profiles_v2'), 'update', " + + "INSERT INTO {{DB}}.settings (fingerprint, type, name, value, inserted_at) VALUES (cityHash64('profiles_v2'), 'update', " + "'profiles_v2', toString(toUnixTimestamp(NOW())), NOW())" ] module.exports.profiles_dist = [ - `CREATE TABLE IF NOT EXISTS profiles_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_dist {{{OnCluster}}} ( timestamp_ns UInt64, fingerprint UInt64, type_id LowCardinality(String), @@ -440,7 +442,7 @@ module.exports.profiles_dist = [ values_agg Array(Tuple(String, Int64, Int32)) ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles', fingerprint);`, - `CREATE TABLE IF NOT EXISTS profiles_series_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_dist {{{OnCluster}}} ( date Date, type_id LowCardinality(String), service_name LowCardinality(String), @@ -448,7 +450,7 @@ module.exports.profiles_dist = [ tags Array(Tuple(String, String)) CODEC(ZSTD(1)) ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series',fingerprint);`, - `CREATE TABLE IF NOT EXISTS profiles_series_gin_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_gin_dist {{{OnCluster}}} ( date Date, key String, val String, @@ -457,23 +459,23 @@ module.exports.profiles_dist = [ fingerprint UInt64 CODEC(DoubleDelta, ZSTD(1)) ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_gin',fingerprint);`, - `CREATE TABLE IF NOT EXISTS profiles_series_keys_dist {{{OnCluster}}} ( + `CREATE TABLE IF NOT EXISTS {{DB}}.profiles_series_keys_dist {{{OnCluster}}} ( date Date, key String, val String, val_id UInt64 ) ENGINE = Distributed('{{CLUSTER}}','{{DB}}','profiles_series_keys', rand());`, - `ALTER TABLE profiles_dist {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`tree\` Array(Tuple(UInt64, UInt64, UInt64, Array(Tuple(String, Int64, Int64)))), ADD COLUMN IF NOT EXISTS \`functions\` Array(Tuple(UInt64, String))`, - `ALTER TABLE profiles_dist {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, - `ALTER TABLE profiles_series_dist {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles_series_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))`, - `ALTER TABLE profiles_series_gin_dist {{{OnCluster}}} + `ALTER TABLE {{DB}}.profiles_series_gin_dist {{{OnCluster}}} ADD COLUMN IF NOT EXISTS \`sample_types_units\` Array(Tuple(String, String))` ] diff --git a/lib/db/throttler.js b/lib/db/throttler.js index c4ad3cf5..2320e0cc 100644 --- a/lib/db/throttler.js +++ b/lib/db/throttler.js @@ -1,8 +1,9 @@ const { isMainThread, parentPort } = require('worker_threads') const axios = require('axios') -const { getClickhouseUrl, samplesTableName } = require('./clickhouse_options') -const clickhouseOptions = require('./clickhouse_options').databaseOptions +const { getClickhouseUrl, samplesTableName, rawRequest } = require('./clickhouse') +const clickhouseOptions = require('./clickhouse').databaseOptions const logger = require('../logger') +const { DATABASE_NAME } = require('../utils') const clusterName = require('../../common').clusterName const dist = clusterName ? '_dist' : '' const { EventEmitter } = require('events') @@ -54,12 +55,7 @@ class TimeoutThrottler { } const _queue = this.queue this.queue = [] - await axios.post(`${getClickhouseUrl()}/?query=${this.statement}`, - _queue.join('\n'), - { - maxBodyLength: Infinity - } - ) + await rawRequest(this.statement, _queue.join('\n'), DATABASE_NAME(), { maxBodyLength: Infinity }) } stop () {