diff --git a/packages/cubejs-backend-shared/src/env.ts b/packages/cubejs-backend-shared/src/env.ts index a8021d303d9ad..b736f26ec004e 100644 --- a/packages/cubejs-backend-shared/src/env.ts +++ b/packages/cubejs-backend-shared/src/env.ts @@ -9,7 +9,7 @@ export class InvalidConfiguration extends Error { } } -export function convertTimeStrToMs( +export function convertTimeStrToSeconds( input: string, envName: string, description: string = 'Must be a number in seconds or duration string (1s, 1m, 1h).', @@ -126,7 +126,7 @@ function asBoolOrTime(input: string, envName: string): number | boolean { return false; } - return convertTimeStrToMs( + return convertTimeStrToSeconds( input, envName, 'Should be boolean or number (in seconds) or string in time format (1s, 1m, 1h)' @@ -510,7 +510,7 @@ const variables: Record any> = { }) => { const key = keyByDataSource('CUBEJS_DB_POLL_MAX_INTERVAL', dataSource); const value = process.env[key] || '5s'; - return convertTimeStrToMs(value, key); + return convertTimeStrToSeconds(value, key); }, /** @@ -525,14 +525,14 @@ const variables: Record any> = { const key = keyByDataSource('CUBEJS_DB_POLL_TIMEOUT', dataSource); const value = process.env[key]; if (value) { - return convertTimeStrToMs(value, key); + return convertTimeStrToSeconds(value, key); } else { return null; } }, /** - * Query timeout. Currently used in BigQuery, Dremio, Postgres, Snowflake + * Query timeout. Currently used in BigQuery, ClickHouse, Dremio, Postgres, Snowflake * and Athena drivers and the orchestrator (queues, pre-aggs). For the * orchestrator this variable did not split by the datasource. * @@ -546,7 +546,7 @@ const variables: Record any> = { } = {}) => { const key = keyByDataSource('CUBEJS_DB_QUERY_TIMEOUT', dataSource); const value = process.env[key] || '10m'; - return convertTimeStrToMs(value, key); + return convertTimeStrToSeconds(value, key); }, /** diff --git a/packages/cubejs-backend-shared/test/env.test.ts b/packages/cubejs-backend-shared/test/env.test.ts index 26a5ef6b434a2..8b15eca83e267 100644 --- a/packages/cubejs-backend-shared/test/env.test.ts +++ b/packages/cubejs-backend-shared/test/env.test.ts @@ -1,17 +1,17 @@ -import { getEnv, convertTimeStrToMs } from '../src/env'; +import { getEnv, convertTimeStrToSeconds } from '../src/env'; test('convertTimeStrToMs', () => { - expect(convertTimeStrToMs('1', 'VARIABLE_ENV')).toBe(1); - expect(convertTimeStrToMs('1s', 'VARIABLE_ENV')).toBe(1); - expect(convertTimeStrToMs('5s', 'VARIABLE_ENV')).toBe(5); - expect(convertTimeStrToMs('1m', 'VARIABLE_ENV')).toBe(1 * 60); - expect(convertTimeStrToMs('10m', 'VARIABLE_ENV')).toBe(10 * 60); - expect(convertTimeStrToMs('1h', 'VARIABLE_ENV')).toBe(60 * 60); - expect(convertTimeStrToMs('2h', 'VARIABLE_ENV')).toBe(2 * 60 * 60); + expect(convertTimeStrToSeconds('1', 'VARIABLE_ENV')).toBe(1); + expect(convertTimeStrToSeconds('1s', 'VARIABLE_ENV')).toBe(1); + expect(convertTimeStrToSeconds('5s', 'VARIABLE_ENV')).toBe(5); + expect(convertTimeStrToSeconds('1m', 'VARIABLE_ENV')).toBe(1 * 60); + expect(convertTimeStrToSeconds('10m', 'VARIABLE_ENV')).toBe(10 * 60); + expect(convertTimeStrToSeconds('1h', 'VARIABLE_ENV')).toBe(60 * 60); + expect(convertTimeStrToSeconds('2h', 'VARIABLE_ENV')).toBe(2 * 60 * 60); }); test('convertTimeStrToMs(exception)', () => { - expect(() => convertTimeStrToMs('', 'VARIABLE_ENV')).toThrowError( + expect(() => convertTimeStrToSeconds('', 'VARIABLE_ENV')).toThrowError( `Value "" is not valid for VARIABLE_ENV. Must be a number in seconds or duration string (1s, 1m, 1h).` ); }); diff --git a/packages/cubejs-base-driver/src/BaseDriver.ts b/packages/cubejs-base-driver/src/BaseDriver.ts index 54d1018635c70..91a44e15c0a45 100644 --- a/packages/cubejs-base-driver/src/BaseDriver.ts +++ b/packages/cubejs-base-driver/src/BaseDriver.ts @@ -614,9 +614,14 @@ export abstract class BaseDriver implements DriverInterface { return []; } - public createTable(quotedTableName: string, columns: TableColumn[]) { + // This is only for use in tests + public async createTableRaw(query: string): Promise { + await this.query(query); + } + + public async createTable(quotedTableName: string, columns: TableColumn[]): Promise { const createTableSql = this.createTableSql(quotedTableName, columns); - return this.query(createTableSql, []).catch(e => { + await this.query(createTableSql, []).catch(e => { e.message = `Error during create table: ${createTableSql}: ${e.message}`; throw e; }); diff --git a/packages/cubejs-clickhouse-driver/package.json b/packages/cubejs-clickhouse-driver/package.json index 7f30360694fbc..4da6b8d15395f 100644 --- a/packages/cubejs-clickhouse-driver/package.json +++ b/packages/cubejs-clickhouse-driver/package.json @@ -27,10 +27,9 @@ "integration:clickhouse": "jest dist/test" }, "dependencies": { - "@cubejs-backend/apla-clickhouse": "^1.7", + "@clickhouse/client": "^1.7.0", "@cubejs-backend/base-driver": "1.1.4", "@cubejs-backend/shared": "1.1.4", - "generic-pool": "^3.6.0", "moment": "^2.24.0", "sqlstring": "^2.3.1", "uuid": "^8.3.2" diff --git a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts index 4cd1c5049426b..0a55468a1e8f8 100644 --- a/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts +++ b/packages/cubejs-clickhouse-driver/src/ClickHouseDriver.ts @@ -15,19 +15,23 @@ import { DownloadTableCSVData, DriverCapabilities, DriverInterface, + QueryOptions, QuerySchemasResult, StreamOptions, StreamTableDataWithTypes, + TableColumn, + TableQueryResult, TableStructure, UnloadOptions, } from '@cubejs-backend/base-driver'; -import genericPool, { Pool } from 'generic-pool'; + +import { Readable } from 'node:stream'; +import { ClickHouseClient, createClient } from '@clickhouse/client'; +import type { ClickHouseSettings, ResponseJSON } from '@clickhouse/client'; import { v4 as uuidv4 } from 'uuid'; import sqlstring from 'sqlstring'; -import { HydrationStream, transformRow } from './HydrationStream'; - -const ClickHouse = require('@cubejs-backend/apla-clickhouse'); +import { transformRow, transformStreamRow } from './HydrationStream'; const ClickhouseTypeToGeneric: Record = { enum: 'text', @@ -54,14 +58,35 @@ const ClickhouseTypeToGeneric: Record = { enum16: 'text', }; -interface ClickHouseDriverOptions { +export interface ClickHouseDriverOptions { host?: string, port?: string, - auth?: string, + username?: string, + password?: string, protocol?: string, database?: string, readOnly?: boolean, - queryOptions?: object, + /** + * Timeout in milliseconds for requests to ClickHouse. + * Default is 10 minutes + */ + requestTimeout?: number, + + /** + * Data source name. + */ + dataSource?: string, + + /** + * Max pool size value for the [cube]<-->[db] pool. + */ + maxPoolSize?: number, + + /** + * Time to wait for a response from a connection after validation + * request before determining it as not valid. Default - 10000 ms. + */ + testConnectionTimeout?: number, } interface ClickhouseDriverExportRequiredAWS { @@ -71,13 +96,24 @@ interface ClickhouseDriverExportRequiredAWS { } interface ClickhouseDriverExportKeySecretAWS extends ClickhouseDriverExportRequiredAWS { - keyId?: string, - secretKey?: string, + keyId: string, + secretKey: string, } interface ClickhouseDriverExportAWS extends ClickhouseDriverExportKeySecretAWS { } +type ClickHouseDriverConfig = { + url: string, + username: string, + password: string, + readOnly: boolean, + database: string, + requestTimeout: number, + exportBucket: ClickhouseDriverExportAWS | null, + clickhouseSettings: ClickHouseSettings, +}; + export class ClickHouseDriver extends BaseDriver implements DriverInterface { /** * Returns default concurrency value. @@ -86,132 +122,105 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { return 5; } - protected readonly pool: Pool; + // ClickHouseClient has internal pool of several sockets, no need for generic-pool + protected readonly client: ClickHouseClient; protected readonly readOnlyMode: boolean; - protected readonly config: any; + protected readonly config: ClickHouseDriverConfig; /** * Class constructor. */ public constructor( - config: ClickHouseDriverOptions & { - /** - * Data source name. - */ - dataSource?: string, - - /** - * Max pool size value for the [cube]<-->[db] pool. - */ - maxPoolSize?: number, - - /** - * Time to wait for a response from a connection after validation - * request before determining it as not valid. Default - 10000 ms. - */ - testConnectionTimeout?: number, - } = {}, + config: ClickHouseDriverOptions = {}, ) { super({ testConnectionTimeout: config.testConnectionTimeout, }); - const dataSource = - config.dataSource || - assertDataSource('default'); + const dataSource = config.dataSource ?? assertDataSource('default'); + const host = config.host ?? getEnv('dbHost', { dataSource }); + const port = config.port ?? getEnv('dbPort', { dataSource }) ?? 8123; + const protocol = config.protocol ?? getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:'; + const url = `${protocol}//${host}:${port}`; + + const username = getEnv('dbUser', { dataSource }); + const password = getEnv('dbPass', { dataSource }); + const database = config.database ?? (getEnv('dbName', { dataSource }) as string) ?? 'default'; + + // TODO this is a bit inconsistent with readOnly + this.readOnlyMode = + getEnv('clickhouseReadOnly', { dataSource }) === 'true'; + + // Expect that getEnv('dbQueryTimeout') will always return a value + const requestTimeoutEnv: number = getEnv('dbQueryTimeout', { dataSource }) * 1000; + const requestTimeout = config.requestTimeout ?? requestTimeoutEnv; this.config = { - host: getEnv('dbHost', { dataSource }), - port: getEnv('dbPort', { dataSource }), - auth: - getEnv('dbUser', { dataSource }) || - getEnv('dbPass', { dataSource }) - ? `${ - getEnv('dbUser', { dataSource }) - }:${ - getEnv('dbPass', { dataSource }) - }` - : '', - protocol: getEnv('dbSsl', { dataSource }) ? 'https:' : 'http:', - queryOptions: { - database: - getEnv('dbName', { dataSource }) || - config && config.database || - 'default' - }, + url, + username, + password, + database, exportBucket: this.getExportBucket(dataSource), - ...config + readOnly: !!config.readOnly, + requestTimeout, + clickhouseSettings: { + // If ClickHouse user's permissions are restricted with "readonly = 1", + // change settings queries are not allowed. Thus, "join_use_nulls" setting + // can not be changed + ...(this.readOnlyMode ? {} : { join_use_nulls: 1 }), + }, }; - this.readOnlyMode = - getEnv('clickhouseReadOnly', { dataSource }) === 'true'; + const maxPoolSize = config.maxPoolSize ?? getEnv("dbMaxPoolSize", { dataSource }) ?? 8; - this.pool = genericPool.createPool({ - create: async () => new ClickHouse({ - ...this.config, - queryOptions: { - // - // - // If ClickHouse user's permissions are restricted with "readonly = 1", - // change settings queries are not allowed. Thus, "join_use_nulls" setting - // can not be changed - // - // - ...(this.readOnlyMode ? {} : { join_use_nulls: 1 }), - session_id: uuidv4(), - ...this.config.queryOptions, - } - }), - destroy: () => Promise.resolve() - }, { - min: 0, - max: - config.maxPoolSize || - getEnv('dbMaxPoolSize', { dataSource }) || - 8, - evictionRunIntervalMillis: 10000, - softIdleTimeoutMillis: 30000, - idleTimeoutMillis: 30000, - acquireTimeoutMillis: 20000 - }); + this.client = this.createClient(maxPoolSize); } - protected withConnection(fn: (con: any, queryId: string) => Promise) { - const self = this; - const connectionPromise = this.pool.acquire(); + protected withCancel(fn: (con: ClickHouseClient, queryId: string, signal: AbortSignal) => Promise): Promise { const queryId = uuidv4(); - let cancelled = false; - const cancelObj: any = {}; - - const promise: any = connectionPromise.then((connection: any) => { - cancelObj.cancel = async () => { - cancelled = true; - await self.withConnection(async conn => { - await conn.querying(`KILL QUERY WHERE query_id = '${queryId}'`); + const abortController = new AbortController(); + const { signal } = abortController; + + const promise = (async () => { + await this.client.ping(); + signal.throwIfAborted(); + // Queries sent by `fn` can hit a timeout error, would _not_ get killed, and continue running in ClickHouse + // TODO should we kill those as well? + const result = await fn(this.client, queryId, signal); + signal.throwIfAborted(); + return result; + })(); + (promise as any).cancel = async () => { + abortController.abort(); + // Use separate client for kill query, usual pool may be busy + const killClient = this.createClient(1); + try { + await killClient.command({ + query: `KILL QUERY WHERE query_id = '${queryId}'`, }); - }; - return fn(connection, queryId) - .then(res => this.pool.release(connection).then(() => { - if (cancelled) { - throw new Error('Query cancelled'); - } - return res; - })) - .catch((err) => this.pool.release(connection).then(() => { - if (cancelled) { - throw new Error('Query cancelled'); - } - throw err; - })); - }); - promise.cancel = () => cancelObj.cancel(); + } finally { + await killClient.close(); + } + }; return promise; } + protected createClient(maxPoolSize: number): ClickHouseClient { + return createClient({ + url: this.config.url, + username: this.config.username, + password: this.config.password, + database: this.config.database, + clickhouse_settings: this.config.clickhouseSettings, + request_timeout: this.config.requestTimeout, + max_open_connections: maxPoolSize, + }); + } + public async testConnection() { await this.query('SELECT 1', []); } @@ -222,46 +231,56 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { true; } - public async query(query: string, values: unknown[]) { - return this.queryResponse(query, values).then((res: any) => this.normaliseResponse(res)); + public async query(query: string, values: unknown[]): Promise { + const response = await this.queryResponse(query, values); + return this.normaliseResponse(response); } - protected queryResponse(query: string, values: unknown[]) { + protected queryResponse(query: string, values: unknown[]): Promise>> { const formattedQuery = sqlstring.format(query, values); - return this.withConnection((connection, queryId) => connection.querying(formattedQuery, { - dataObjects: true, - queryOptions: { - query_id: queryId, - // - // - // If ClickHouse user's permissions are restricted with "readonly = 1", - // change settings queries are not allowed. Thus, "join_use_nulls" setting - // can not be changed - // - // - ...(this.readOnlyMode ? {} : { join_use_nulls: 1 }), + return this.withCancel(async (connection, queryId, signal) => { + try { + const format = 'JSON'; + + const resultSet = await connection.query({ + query: formattedQuery, + query_id: queryId, + format, + clickhouse_settings: this.config.clickhouseSettings, + abort_signal: signal, + }); + + if (resultSet.response_headers['x-clickhouse-format'] !== format) { + throw new Error(`Unexpected x-clickhouse-format in response: expected ${format}, received ${resultSet.response_headers['x-clickhouse-format']}`); + } + + // We used format JSON, so we expect each row to be Record with column names as keys + const results = await resultSet.json>(); + return results; + } catch (e) { + throw new Error(`Query failed; query id: ${queryId}, SQL: ${query}`, { cause: e }); } - })); + }); } - protected normaliseResponse(res: any) { + protected normaliseResponse(res: ResponseJSON>): Array { if (res.data) { - const meta = res.meta.reduce( - (state: any, element: any) => ({ [element.name]: element, ...state }), + const meta = (res.meta ?? []).reduce>( + (state, element) => ({ [element.name]: element, ...state }), {} ); - res.data.forEach((row: any) => { + // TODO maybe use row-based format here as well? + res.data.forEach((row) => { transformRow(row, meta); }); } - return res.data; + return res.data as Array; } public async release() { - await this.pool.drain(); - await this.pool.clear(); + await this.client.close(); } public informationSchemaQuery() { @@ -271,7 +290,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { database as table_schema, type as data_type FROM system.columns - WHERE database = '${this.config.queryOptions.database}' + WHERE database = '${this.config.database}' `; } @@ -306,7 +325,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { } public override async getSchemas(): Promise { - return [{ schema_name: this.config.queryOptions.database }]; + return [{ schema_name: this.config.database }]; } public async stream( @@ -315,53 +334,80 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { // eslint-disable-next-line @typescript-eslint/no-unused-vars { highWaterMark }: StreamOptions ): Promise { - // eslint-disable-next-line no-underscore-dangle - const conn = await ( this.pool)._factory.create(); + // Use separate client for this long-living query + const client = this.createClient(1); + const queryId = uuidv4(); try { const formattedQuery = sqlstring.format(query, values); - return await new Promise((resolve, reject) => { - const options = { - queryOptions: { - query_id: uuidv4(), - // - // - // If ClickHouse user's permissions are restricted with "readonly = 1", - // change settings queries are not allowed. Thus, "join_use_nulls" setting - // can not be changed - // - // - ...(this.readOnlyMode ? {} : { join_use_nulls: 1 }), - } - }; - - const originalStream = conn.query(formattedQuery, options, (err: Error | null, result: any) => { - if (err) { - reject(err); - } else { - const rowStream = new HydrationStream(result.meta); - originalStream.pipe(rowStream); - - resolve({ - rowStream, - types: result.meta.map((field: any) => ({ - name: field.name, - type: this.toGenericType(field.type), - })), - release: async () => { - // eslint-disable-next-line no-underscore-dangle - await ( this.pool)._factory.destroy(conn); - } - }); - } - }); + const format = 'JSONCompactEachRowWithNamesAndTypes'; + + const resultSet = await client.query({ + query: formattedQuery, + query_id: queryId, + format, + clickhouse_settings: this.config.clickhouseSettings, }); - } catch (e) { - // eslint-disable-next-line no-underscore-dangle - await ( this.pool)._factory.destroy(conn); - throw e; + if (resultSet.response_headers['x-clickhouse-format'] !== format) { + throw new Error(`Unexpected x-clickhouse-format in response: expected ${format}, received ${resultSet.response_headers['x-clickhouse-format']}`); + } + + // Array is okay, because we use fixed JSONCompactEachRowWithNamesAndTypes format + // And each row after first two will look like this: [42, "hello", [0,1]] + // https://clickhouse.com/docs/en/interfaces/formats#jsoncompacteachrowwithnamesandtypes + const resultSetStream = resultSet.stream>(); + + const allRowsIter = (async function* allRowsIter() { + for await (const rowsBatch of resultSetStream) { + for (const row of rowsBatch) { + yield row.json(); + } + } + }()); + + const first = await allRowsIter.next(); + if (first.done) { + throw new Error('Unexpected stream end before row with names'); + } + // JSONCompactEachRowWithNamesAndTypes: expect first row to be column names as string + const names = first.value as Array; + + const second = await allRowsIter.next(); + if (second.done) { + throw new Error('Unexpected stream end before row with types'); + } + // JSONCompactEachRowWithNamesAndTypes: expect first row to be column names as string + const types = second.value as Array; + + if (names.length !== types.length) { + throw new Error(`Unexpected names and types length mismatch; names ${names.length} vs types ${types.length}`); + } + + const dataRowsIter = (async function* () { + for await (const row of allRowsIter) { + yield transformStreamRow(row, names, types); + } + }()); + const rowStream = Readable.from(dataRowsIter); + + return { + rowStream, + types: names.map((name, idx) => { + const type = types[idx]; + return { + name, + type: this.toGenericType(type), + }; + }), + release: async () => { + await client.close(); + } + }; + } catch (e) { + await client.close(); + throw new Error(`Stream query failed; query id: ${queryId}, SQL: ${query}`, { cause: e }); } } @@ -370,7 +416,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { values: unknown[], options: DownloadQueryResultsOptions ): Promise { - if ((options || {}).streamImport) { + if ((options ?? {}).streamImport) { return this.stream(query, values, options); } @@ -378,7 +424,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { return { rows: this.normaliseResponse(response), - types: response.meta.map((field: any) => ({ + types: (response.meta ?? []).map((field) => ({ name: field.name, type: this.toGenericType(field.type), })), @@ -412,16 +458,20 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { } public async createSchemaIfNotExists(schemaName: string): Promise { - await this.query(`CREATE DATABASE IF NOT EXISTS ${schemaName}`, []); + await this.command(`CREATE DATABASE IF NOT EXISTS ${schemaName}`); } - public getTablesQuery(schemaName: string) { + public getTablesQuery(schemaName: string): Promise { return this.query('SELECT name as table_name FROM system.tables WHERE database = ?', [schemaName]); } + public override async dropTable(tableName: string, _options?: QueryOptions): Promise { + await this.command(`DROP TABLE ${tableName}`); + } + protected getExportBucket( dataSource: string, - ): ClickhouseDriverExportAWS | undefined { + ): ClickhouseDriverExportAWS | null { const supportedBucketTypes = ['s3']; const requiredExportBucket: ClickhouseDriverExportRequiredAWS = { @@ -433,7 +483,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { region: getEnv('dbExportBucketAwsRegion', { dataSource }), }; - const exportBucket: Partial = { + const exportBucket: ClickhouseDriverExportAWS = { ...requiredExportBucket, keyId: getEnv('dbExportBucketAwsKey', { dataSource }), secretKey: getEnv('dbExportBucketAwsSecret', { dataSource }), @@ -455,10 +505,10 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { ); } - return exportBucket as ClickhouseDriverExportAWS; + return exportBucket; } - return undefined; + return null; } public async isUnloadSupported() { @@ -473,17 +523,38 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { * Returns an array of queried fields meta info. */ public async queryColumnTypes(sql: string, params: unknown[]): Promise { - const columns = await this.query(`DESCRIBE ${sql}`, params); + // For DESCRIBE we expect that each row would have special structure + // See https://clickhouse.com/docs/en/sql-reference/statements/describe-table + // TODO complete this type + type DescribeRow = { + name: string, + type: string + }; + const columns = await this.query(`DESCRIBE ${sql}`, params); if (!columns) { throw new Error('Unable to describe table'); } - return columns.map((column: any) => ({ + return columns.map((column) => ({ name: column.name, type: this.toGenericType(column.type), })); } + // This is only for use in tests + public override async createTableRaw(query: string): Promise { + await this.command(query); + } + + public override async createTable(quotedTableName: string, columns: TableColumn[]) { + const createTableSql = this.createTableSql(quotedTableName, columns); + try { + await this.command(createTableSql); + } catch (e) { + throw new Error(`Error during create table: ${createTableSql}`, { cause: e }); + } + } + /** * We use unloadWithoutTempTable strategy */ @@ -507,7 +578,7 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { const types = await this.queryColumnTypes(`(${sql})`, params); const exportPrefix = uuidv4(); - await this.queryResponse(` + const formattedQuery = sqlstring.format(` INSERT INTO FUNCTION s3( 'https://${this.config.exportBucket.bucketName}.s3.${this.config.exportBucket.region}.amazonaws.com/${exportPrefix}/export.csv.gz', @@ -518,6 +589,8 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { ${sql} `, params); + await this.command(formattedQuery); + const csvFile = await this.extractUnloadedFilesFromS3( { credentials: { @@ -545,4 +618,28 @@ export class ClickHouseDriver extends BaseDriver implements DriverInterface { incrementalSchemaLoading: true, }; } + + // This is not part of a driver interface, and marked public only for testing + public async command(query: string): Promise { + await this.withCancel(async (connection, queryId, signal) => { + await connection.command({ + query, + query_id: queryId, + abort_signal: signal, + }); + }); + } + + // This is not part of a driver interface, and marked public only for testing + public async insert(table: string, values: Array>): Promise { + await this.withCancel(async (connection, queryId, signal) => { + await connection.insert({ + table, + values, + format: 'JSONCompactEachRow', + query_id: queryId, + abort_signal: signal, + }); + }); + } } diff --git a/packages/cubejs-clickhouse-driver/src/HydrationStream.ts b/packages/cubejs-clickhouse-driver/src/HydrationStream.ts index cb7bcc97dd5e5..8402e2e40ef23 100644 --- a/packages/cubejs-clickhouse-driver/src/HydrationStream.ts +++ b/packages/cubejs-clickhouse-driver/src/HydrationStream.ts @@ -1,41 +1,48 @@ -import stream, { TransformCallback } from 'stream'; import * as moment from 'moment'; // ClickHouse returns DateTime as strings in format "YYYY-DD-MM HH:MM:SS" // cube.js expects them in format "YYYY-DD-MMTHH:MM:SS.000", so translate them based on the metadata returned // // ClickHouse returns some number types as js numbers, others as js string, normalise them all to strings -export function transformRow(row: Record, meta: any) { - for (const [fieldName, value] of Object.entries(row)) { - if (value !== null) { - const metaForField = meta[fieldName]; - if (metaForField.type.includes('DateTime64')) { - row[fieldName] = moment.utc(value).format(moment.HTML5_FMT.DATETIME_LOCAL_MS); - } else if (metaForField.type.includes('DateTime') /** Can be DateTime or DateTime('timezone') */) { - row[fieldName] = `${value.substring(0, 10)}T${value.substring(11, 22)}.000`; - } else if (metaForField.type.includes('Date')) { - row[fieldName] = `${value}T00:00:00.000`; - } else if (metaForField.type.includes('Int') - || metaForField.type.includes('Float') - || metaForField.type.includes('Decimal') - ) { - // convert all numbers into strings - row[fieldName] = `${value}`; - } +function transformValue(type: string, value: unknown) { + if (value !== null) { + if (type.includes('DateTime64')) { + return moment.utc(value).format(moment.HTML5_FMT.DATETIME_LOCAL_MS); + } else if (type.includes('DateTime') /** Can be DateTime or DateTime('timezone') */) { + // expect DateTime to always be string + const valueStr = value as string; + return `${valueStr.substring(0, 10)}T${valueStr.substring(11, 22)}.000`; + } else if (type.includes('Date')) { + return `${value}T00:00:00.000`; + } else if (type.includes('Int') + || type.includes('Float') + || type.includes('Decimal') + ) { + // convert all numbers into strings + return `${value}`; } } + + return value; } -export class HydrationStream extends stream.Transform { - public constructor(meta: any) { - super({ - objectMode: true, - transform(row: any[], encoding: BufferEncoding, callback: TransformCallback) { - transformRow(row, meta); +export function transformRow(row: Record, meta: any) { + for (const [fieldName, value] of Object.entries(row)) { + const metaForField = meta[fieldName]; + row[fieldName] = transformValue(metaForField.type, value); + } +} - this.push(row); - callback(); - } - }); +export function transformStreamRow(row: Array, names: Array, types: Array): Record { + if (row.length !== names.length) { + throw new Error(`Unexpected row and names/types length mismatch; row ${row.length} vs names ${names.length}`); } + + return row.reduce>((rowObj, value, idx) => { + const name = names[idx]; + const type = types[idx]; + rowObj[name] = transformValue(type, value); + return rowObj; + // TODO do we actually want Object.create(null) safety? or is it ok to use {} + }, Object.create(null)); } diff --git a/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts b/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts index c501bd3568b22..9db85c8ea7cf1 100644 --- a/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts +++ b/packages/cubejs-clickhouse-driver/test/ClickHouseDriver.test.ts @@ -2,18 +2,23 @@ import { ClickhouseDBRunner } from '@cubejs-backend/testing-shared'; import { streamToArray } from '@cubejs-backend/shared'; import { ClickHouseDriver } from '../src'; +import type { ClickHouseDriverOptions } from '../src'; describe('ClickHouseDriver', () => { jest.setTimeout(20 * 1000); let container: any; - let config: any; + let config: ClickHouseDriverOptions; const doWithDriver = async (cb: (driver: ClickHouseDriver) => Promise) => { const driver = new ClickHouseDriver(config); try { await cb(driver); + } catch (e) { + const newError = new Error('doWithDriver failed', { cause: e }); + console.log(newError); + throw newError; } finally { await driver.release(); } @@ -30,7 +35,7 @@ describe('ClickHouseDriver', () => { await doWithDriver(async (driver) => { await driver.createSchemaIfNotExists('test'); - await driver.query( + await driver.command( ` CREATE TABLE test.types_test ( date Date, @@ -54,18 +59,17 @@ describe('ClickHouseDriver', () => { enum8 Enum('hello' = 1, 'world' = 2), enum16 Enum('hello' = 1, 'world' = 1000) ) ENGINE Log - `, - [] + ` ); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', '2020-01-01 00:00:00.000000', '2020-01-01 00:00:00.000000000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01, 'hello', 'world' + await driver.insert('test.types_test', [ + ['2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00.000', '2020-01-01 00:00:00.000000', '2020-01-01 00:00:00.000000000', 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1.01, 1.01, 1.01, 'hello', 'world'] ]); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', '2020-01-02 00:00:00.123456', '2020-01-02 00:00:00.123456789', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02, 'hello', 'world' + await driver.insert('test.types_test', [ + ['2020-01-02', '2020-01-02 00:00:00', '2020-01-02 00:00:00.123', '2020-01-02 00:00:00.123456', '2020-01-02 00:00:00.123456789', 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2.02, 2.02, 2.02, 'hello', 'world'] ]); - await driver.query('INSERT INTO test.types_test VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)', [ - '2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', '2020-01-03 00:00:00.234567', '2020-01-03 00:00:00.234567890', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03, 'hello', 'world' + await driver.insert('test.types_test', [ + ['2020-01-03', '2020-01-03 00:00:00', '2020-01-03 00:00:00.234', '2020-01-03 00:00:00.234567', '2020-01-03 00:00:00.234567890', 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3.03, 3.03, 3.03, 'hello', 'world'] ]); }); }, 30 * 1000); @@ -75,7 +79,7 @@ describe('ClickHouseDriver', () => { jest.setTimeout(10 * 1000); await doWithDriver(async (driver) => { - await driver.query('DROP DATABASE test', []); + await driver.command('DROP DATABASE test'); }); if (container) { @@ -147,7 +151,7 @@ describe('ClickHouseDriver', () => { try { await driver.createSchemaIfNotExists(name); } finally { - await driver.query(`DROP DATABASE ${name}`, []); + await driver.command(`DROP DATABASE ${name}`); } }); }); @@ -185,8 +189,8 @@ describe('ClickHouseDriver', () => { const name = `temp_${Date.now()}`; try { await driver.createSchemaIfNotExists(name); - await driver.query(`CREATE TABLE ${name}.a (dateTime DateTime, date Date) ENGINE Log`, []); - await driver.query(`INSERT INTO ${name}.a VALUES ('2019-04-30 11:55:00', '2019-04-30')`, []); + await driver.command(`CREATE TABLE ${name}.a (dateTime DateTime, date Date) ENGINE Log`); + await driver.insert(`${name}.a`, [['2019-04-30 11:55:00', '2019-04-30']]); const values = await driver.query(`SELECT * FROM ${name}.a`, []); expect(values).toEqual([{ @@ -194,7 +198,7 @@ describe('ClickHouseDriver', () => { date: '2019-04-30T00:00:00.000', }]); } finally { - await driver.query(`DROP DATABASE ${name}`, []); + await driver.command(`DROP DATABASE ${name}`); } }); }); @@ -204,12 +208,12 @@ describe('ClickHouseDriver', () => { const name = `temp_${Date.now()}`; try { await driver.createSchemaIfNotExists(name); - await driver.query(`CREATE TABLE ${name}.test (x Int32, s String) ENGINE Log`, []); - await driver.query(`INSERT INTO ${name}.test VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']); + await driver.command(`CREATE TABLE ${name}.test (x Int32, s String) ENGINE Log`); + await driver.insert(`${name}.test`, [[1, 'str1'], [2, 'str2'], [3, 'str3']]); const values = await driver.query(`SELECT * FROM ${name}.test WHERE x = ?`, [2]); expect(values).toEqual([{ x: '2', s: 'str2' }]); } finally { - await driver.query(`DROP DATABASE ${name}`, []); + await driver.command(`DROP DATABASE ${name}`); } }); }); @@ -219,11 +223,11 @@ describe('ClickHouseDriver', () => { const name = `temp_${Date.now()}`; try { await driver.createSchemaIfNotExists(name); - await driver.query(`CREATE TABLE ${name}.a (x Int32, s String) ENGINE Log`, []); - await driver.query(`INSERT INTO ${name}.a VALUES (?, ?), (?, ?), (?, ?)`, [1, 'str1', 2, 'str2', 3, 'str3']); + await driver.command(`CREATE TABLE ${name}.a (x Int32, s String) ENGINE Log`); + await driver.insert(`${name}.a`, [[1, 'str1'], [2, 'str2'], [3, 'str3']]); - await driver.query(`CREATE TABLE ${name}.b (x Int32, s String) ENGINE Log`, []); - await driver.query(`INSERT INTO ${name}.b VALUES (?, ?), (?, ?), (?, ?)`, [2, 'str2', 3, 'str3', 4, 'str4']); + await driver.command(`CREATE TABLE ${name}.b (x Int32, s String) ENGINE Log`); + await driver.insert(`${name}.b`, [[2, 'str2'], [3, 'str3'], [4, 'str4']]); const values = await driver.query(`SELECT * FROM ${name}.a LEFT OUTER JOIN ${name}.b ON a.x = b.x`, []); expect(values).toEqual([ @@ -238,7 +242,7 @@ describe('ClickHouseDriver', () => { } ]); } finally { - await driver.query(`DROP DATABASE ${name}`, []); + await driver.command(`DROP DATABASE ${name}`); } }); }); @@ -307,9 +311,9 @@ describe('ClickHouseDriver', () => { { name: 'enum16', type: 'text' }, ]); expect(await streamToArray(tableData.rowStream as any)).toEqual([ - ['2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '2020-01-01T00:00:00.000', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1', '1.01', '1.01', '1.01', 'hello', 'world'], - ['2020-01-02T00:00:00.000', '2020-01-02T00:00:00.000', '2020-01-02T00:00:00.123', '2020-01-02T00:00:00.123', '2020-01-02T00:00:00.123', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2', '2.02', '2.02', '2.02', 'hello', 'world'], - ['2020-01-03T00:00:00.000', '2020-01-03T00:00:00.000', '2020-01-03T00:00:00.234', '2020-01-03T00:00:00.234', '2020-01-03T00:00:00.234', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3', '3.03', '3.03', '3.03', 'hello', 'world'], + { date: '2020-01-01T00:00:00.000', datetime: '2020-01-01T00:00:00.000', datetime64_millis: '2020-01-01T00:00:00.000', datetime64_micros: '2020-01-01T00:00:00.000', datetime64_nanos: '2020-01-01T00:00:00.000', int8: '1', int16: '1', int32: '1', int64: '1', uint8: '1', uint16: '1', uint32: '1', uint64: '1', float32: '1', float64: '1', decimal32: '1.01', decimal64: '1.01', decimal128: '1.01', enum8: 'hello', enum16: 'world' }, + { date: '2020-01-02T00:00:00.000', datetime: '2020-01-02T00:00:00.000', datetime64_millis: '2020-01-02T00:00:00.123', datetime64_micros: '2020-01-02T00:00:00.123', datetime64_nanos: '2020-01-02T00:00:00.123', int8: '2', int16: '2', int32: '2', int64: '2', uint8: '2', uint16: '2', uint32: '2', uint64: '2', float32: '2', float64: '2', decimal32: '2.02', decimal64: '2.02', decimal128: '2.02', enum8: 'hello', enum16: 'world' }, + { date: '2020-01-03T00:00:00.000', datetime: '2020-01-03T00:00:00.000', datetime64_millis: '2020-01-03T00:00:00.234', datetime64_micros: '2020-01-03T00:00:00.234', datetime64_nanos: '2020-01-03T00:00:00.234', int8: '3', int16: '3', int32: '3', int64: '3', uint8: '3', uint16: '3', uint32: '3', uint64: '3', float32: '3', float64: '3', decimal32: '3.03', decimal64: '3.03', decimal128: '3.03', enum8: 'hello', enum16: 'world' }, ]); } finally { // @ts-ignore diff --git a/packages/cubejs-testing-drivers/src/tests/testQueries.ts b/packages/cubejs-testing-drivers/src/tests/testQueries.ts index 5ca9133a30512..b755aea1772be 100644 --- a/packages/cubejs-testing-drivers/src/tests/testQueries.ts +++ b/packages/cubejs-testing-drivers/src/tests/testQueries.ts @@ -117,7 +117,7 @@ export function testQueries(type: string, { includeIncrementalSchemaSuite, exten console.log(`Creating ${queries.length} fixture tables`); try { for (const q of queries) { - await driver.query(q); + await driver.createTableRaw(q); } console.log(`Creating ${queries.length} fixture tables completed`); } catch (e: any) { diff --git a/yarn.lock b/yarn.lock index 52b737c54400a..bbc90068904a5 100644 --- a/yarn.lock +++ b/yarn.lock @@ -4222,6 +4222,18 @@ resolved "https://registry.yarnpkg.com/@bcoe/v8-coverage/-/v8-coverage-0.2.3.tgz#75a2e8b51cb758a7553d6804a5932d7aace75c39" integrity sha512-0hYQ8SB4Db5zvZB4axdMHGwEaQjkZzFjQiN9LVYvIFB2nSUHW9tYpxWriPrWDASIxiaXax83REcLxuSdnGPZtw== +"@clickhouse/client-common@1.7.0": + version "1.7.0" + resolved "https://registry.yarnpkg.com/@clickhouse/client-common/-/client-common-1.7.0.tgz#4d0315158d275ea8d55ed8e04d69871832f4d8ba" + integrity sha512-RkHYf23/wyv/6C0KcVD4nRX4JAn/Y+9AZBQPlrSId2JwXsmAnjDkkKpuPLwZPNVH6J3BkW+y8bQCEk3VHQzArw== + +"@clickhouse/client@^1.7.0": + version "1.7.0" + resolved "https://registry.yarnpkg.com/@clickhouse/client/-/client-1.7.0.tgz#a6b7b72db825162b1f54c2fe383f349dbf437fbd" + integrity sha512-2aESIFRbSPWEZIU++sXt1RYWgEKZH75C3jyXLcRBeafMDjq7bKV2AX1X9n9xscN+Y4VvnkBzkjFxcbuqFSBk6w== + dependencies: + "@clickhouse/client-common" "1.7.0" + "@codemirror/highlight@^0.19.0": version "0.19.6" resolved "https://registry.yarnpkg.com/@codemirror/highlight/-/highlight-0.19.6.tgz#7f2e066f83f5649e8e0748a3abe0aaeaf64b8ac2" @@ -4353,7 +4365,7 @@ tiny-invariant "^1.3.3" valid-url "^1.0.9" -"@cubejs-backend/apla-clickhouse@^1.7", "@cubejs-backend/apla-clickhouse@^1.7.0": +"@cubejs-backend/apla-clickhouse@^1.7.0": version "1.7.0" resolved "https://registry.yarnpkg.com/@cubejs-backend/apla-clickhouse/-/apla-clickhouse-1.7.0.tgz#6359f46c56492d1704d18be0210c7546fdac5f5e" integrity sha512-qwXapTC/qosA6RprElRjnl8gmlDQaxtJPtbgcdjyNvkmiyao1HI+w5QkjHWCiVm6aTzE0gjFr6/2y87TZ9fojg==