Skip to content

Commit

Permalink
feat(clickhouse-driver): Switch from apla-clickhouse to @clickhouse/c…
Browse files Browse the repository at this point in the history
…lient (#8928)

Switch client library used to talk to ClickHouse to upstream one. Lots of related changes:

* Streaming now does not use streaming JSON parser, so we can't rely on `meta` field in JSON format. Instread it relies on `JSONCompactEachRowWithNamesAndTypes`: first two rows returned should contain names and types. https://clickhouse.com/docs/en/sql-reference/formats#jsoncompacteachrowwithnamesandtypes
* Streaming now use async iterators instread of Node.js streams internally. External API returns stream, as before
* Pooling moved completely to client library. `generic-pool` is not used at all, `dbMaxPoolSize`  is passed to client library to limit open sockets. New client maintains `http.Agent` internally, and have it's own idle timers, looks fine for us.
* Queries now does not send `session_id`, as we anyway expect queries to be independent, and don't use session-bound stuff, like temporary tables. Previous behaviour was kind of weird: session ids were attached to client in pool, but for every query it would acquire new client from pool, so nothing could actually utilize same session.
* `KILL QUERY` on cancellation now uses separate client instance, to avoid getting stuck on busy pool
* `query` method supports only `SELECT` queries, or other queries that return result sets. For DDL queries on this client library one have to use other methods. Because of that more overrides were necessary, like `dropTable`,  `createSchemaIfNotExists` or `createTable`.
* Driver now respects per-datasource `dbQueryTimeout` config
* fix(backend-shared): Rename `convertTimeStrToMs` to `convertTimeStrToSeconds`. It returns input number (which should be seconds) as it is, and 5 for '5s'
  • Loading branch information
mcheshkov authored Nov 19, 2024
1 parent 137de67 commit e25e65f
Show file tree
Hide file tree
Showing 9 changed files with 383 additions and 259 deletions.
12 changes: 6 additions & 6 deletions packages/cubejs-backend-shared/src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ export class InvalidConfiguration extends Error {
}
}

export function convertTimeStrToMs(
export function convertTimeStrToSeconds(
input: string,
envName: string,
description: string = 'Must be a number in seconds or duration string (1s, 1m, 1h).',
Expand Down Expand Up @@ -126,7 +126,7 @@ function asBoolOrTime(input: string, envName: string): number | boolean {
return false;
}

return convertTimeStrToMs(
return convertTimeStrToSeconds(
input,
envName,
'Should be boolean or number (in seconds) or string in time format (1s, 1m, 1h)'
Expand Down Expand Up @@ -510,7 +510,7 @@ const variables: Record<string, (...args: any) => any> = {
}) => {
const key = keyByDataSource('CUBEJS_DB_POLL_MAX_INTERVAL', dataSource);
const value = process.env[key] || '5s';
return convertTimeStrToMs(value, key);
return convertTimeStrToSeconds(value, key);
},

/**
Expand All @@ -525,14 +525,14 @@ const variables: Record<string, (...args: any) => any> = {
const key = keyByDataSource('CUBEJS_DB_POLL_TIMEOUT', dataSource);
const value = process.env[key];
if (value) {
return convertTimeStrToMs(value, key);
return convertTimeStrToSeconds(value, key);
} else {
return null;
}
},

/**
* Query timeout. Currently used in BigQuery, Dremio, Postgres, Snowflake
* Query timeout. Currently used in BigQuery, ClickHouse, Dremio, Postgres, Snowflake
* and Athena drivers and the orchestrator (queues, pre-aggs). For the
* orchestrator this variable did not split by the datasource.
*
Expand All @@ -546,7 +546,7 @@ const variables: Record<string, (...args: any) => any> = {
} = {}) => {
const key = keyByDataSource('CUBEJS_DB_QUERY_TIMEOUT', dataSource);
const value = process.env[key] || '10m';
return convertTimeStrToMs(value, key);
return convertTimeStrToSeconds(value, key);
},

/**
Expand Down
18 changes: 9 additions & 9 deletions packages/cubejs-backend-shared/test/env.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import { getEnv, convertTimeStrToMs } from '../src/env';
import { getEnv, convertTimeStrToSeconds } from '../src/env';

test('convertTimeStrToMs', () => {
expect(convertTimeStrToMs('1', 'VARIABLE_ENV')).toBe(1);
expect(convertTimeStrToMs('1s', 'VARIABLE_ENV')).toBe(1);
expect(convertTimeStrToMs('5s', 'VARIABLE_ENV')).toBe(5);
expect(convertTimeStrToMs('1m', 'VARIABLE_ENV')).toBe(1 * 60);
expect(convertTimeStrToMs('10m', 'VARIABLE_ENV')).toBe(10 * 60);
expect(convertTimeStrToMs('1h', 'VARIABLE_ENV')).toBe(60 * 60);
expect(convertTimeStrToMs('2h', 'VARIABLE_ENV')).toBe(2 * 60 * 60);
expect(convertTimeStrToSeconds('1', 'VARIABLE_ENV')).toBe(1);
expect(convertTimeStrToSeconds('1s', 'VARIABLE_ENV')).toBe(1);
expect(convertTimeStrToSeconds('5s', 'VARIABLE_ENV')).toBe(5);
expect(convertTimeStrToSeconds('1m', 'VARIABLE_ENV')).toBe(1 * 60);
expect(convertTimeStrToSeconds('10m', 'VARIABLE_ENV')).toBe(10 * 60);
expect(convertTimeStrToSeconds('1h', 'VARIABLE_ENV')).toBe(60 * 60);
expect(convertTimeStrToSeconds('2h', 'VARIABLE_ENV')).toBe(2 * 60 * 60);
});

test('convertTimeStrToMs(exception)', () => {
expect(() => convertTimeStrToMs('', 'VARIABLE_ENV')).toThrowError(
expect(() => convertTimeStrToSeconds('', 'VARIABLE_ENV')).toThrowError(
`Value "" is not valid for VARIABLE_ENV. Must be a number in seconds or duration string (1s, 1m, 1h).`
);
});
Expand Down
9 changes: 7 additions & 2 deletions packages/cubejs-base-driver/src/BaseDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -614,9 +614,14 @@ export abstract class BaseDriver implements DriverInterface {
return [];
}

public createTable(quotedTableName: string, columns: TableColumn[]) {
// This is only for use in tests
public async createTableRaw(query: string): Promise<void> {
await this.query(query);
}

public async createTable(quotedTableName: string, columns: TableColumn[]): Promise<void> {
const createTableSql = this.createTableSql(quotedTableName, columns);
return this.query(createTableSql, []).catch(e => {
await this.query(createTableSql, []).catch(e => {
e.message = `Error during create table: ${createTableSql}: ${e.message}`;
throw e;
});
Expand Down
3 changes: 1 addition & 2 deletions packages/cubejs-clickhouse-driver/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@
"integration:clickhouse": "jest dist/test"
},
"dependencies": {
"@cubejs-backend/apla-clickhouse": "^1.7",
"@clickhouse/client": "^1.7.0",
"@cubejs-backend/base-driver": "1.1.4",
"@cubejs-backend/shared": "1.1.4",
"generic-pool": "^3.6.0",
"moment": "^2.24.0",
"sqlstring": "^2.3.1",
"uuid": "^8.3.2"
Expand Down
Loading

0 comments on commit e25e65f

Please sign in to comment.