Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snowflake-driver): Improve support for NUMERIC in pre-aggregations #2811

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 12 additions & 15 deletions packages/cubejs-cubestore-driver/src/CubeStoreDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ import { unlink } from 'fs-extra';
import tempy from 'tempy';
import csvWriter from 'csv-write-stream';
import {
BaseDriver,
DownloadTableCSVData,
DownloadTableData,
DownloadTableMemoryData,
StreamTableData,
BaseDriver, DownloadTableCSVData, DownloadTableMemoryData,
DriverInterface, IndexesSQL, StreamTableData, TableStructure,
} from '@cubejs-backend/query-orchestrator';
import { getEnv } from '@cubejs-backend/shared';
import { format as formatSql } from 'sqlstring';
Expand All @@ -24,12 +21,7 @@ const GenericTypeToCubeStore: Record<string, string> = {
text: 'varchar(255)'
};

type Column = {
type: string;
name: string;
};

export class CubeStoreDriver extends BaseDriver {
export class CubeStoreDriver extends BaseDriver implements DriverInterface {
protected readonly config: any;

protected readonly connection: WebSocketConnection;
Expand Down Expand Up @@ -89,7 +81,7 @@ export class CubeStoreDriver extends BaseDriver {
return super.toColumnValue(value, genericType);
}

public async uploadTableWithIndexes(table: string, columns: Column[], tableData: any, indexesSql: any) {
public async uploadTableWithIndexes(table: string, columns: TableStructure, tableData: any, indexesSql: IndexesSQL) {
const indexes =
indexesSql.map((s: any) => s.sql[0].replace(/^CREATE INDEX (.*?) ON (.*?) \((.*)$/, 'INDEX $1 ($3')).join(' ');

Expand All @@ -104,7 +96,12 @@ export class CubeStoreDriver extends BaseDriver {
}
}

private async importRows(table: string, columns: Column[], indexesSql: any, tableData: DownloadTableMemoryData) {
private async importRows(
table: string,
columns: TableStructure,
indexesSql: IndexesSQL,
tableData: DownloadTableMemoryData
) {
await this.createTable(table, columns);
try {
for (let i = 0; i < indexesSql.length; i++) {
Expand Down Expand Up @@ -134,7 +131,7 @@ export class CubeStoreDriver extends BaseDriver {
}
}

private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: Column[], indexes) {
private async importCsvFile(tableData: DownloadTableCSVData, table: string, columns: TableStructure, indexes) {
const files = Array.isArray(tableData.csvFile) ? tableData.csvFile : [tableData.csvFile];
const createTableSql = this.createTableSql(table, columns);

Expand All @@ -154,7 +151,7 @@ export class CubeStoreDriver extends BaseDriver {
});
}

private async importStream(columns: Column[], tableData: StreamTableData, table: string, indexes) {
private async importStream(columns: TableStructure, tableData: StreamTableData, table: string, indexes) {
const writer = csvWriter({ headers: columns.map(c => c.name) });
const gzipStream = createGzip();
const tempFile = tempy.file();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,7 +338,7 @@ export class BaseDriver {
[name, schema]
);

return columns.map(c => ({ name: c.column_name, type: this.toGenericType(c.data_type) }));
return columns.map(c => ({ name: c.column_name, type: this.toGenericTypeFromColumn(c) }));
}

createTable(quotedTableName, columns) {
Expand All @@ -354,6 +354,13 @@ export class BaseDriver {
return `CREATE TABLE ${quotedTableName} (${columns.join(', ')})`;
}

/**
* @param {object} column
*/
toGenericTypeFromColumn(column) {
return this.toGenericType(column.data_type);
}

/**
* @param {string} columnType
* @return {string}
Expand Down
40 changes: 33 additions & 7 deletions packages/cubejs-snowflake-driver/src/SnowflakeDriver.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,21 @@ const hydrators: HydrationConfiguration[] = [
}
];

type NumberColumn = {
NAME: string,
TYPE: 'NUMBER',
NUMERIC_PRECISION: string,
NUMERIC_SCALE: string,
};
type CommonColumn = {
NAME: string,
TYPE: string,
NUMERIC_PRECISION: null,
NUMERIC_SCALE: null,
};
type ColumnInfo = NumberColumn | CommonColumn;

const SnowflakeToGenericType: Record<string, GenericDataBaseType> = {
number: 'decimal',
timestamp_ntz: 'timestamp'
};

Expand Down Expand Up @@ -407,20 +420,33 @@ export class SnowflakeDriver extends BaseDriver implements DriverInterface {
return SnowflakeToGenericType[columnType.toLowerCase()] || super.toGenericType(columnType);
}

public toGenericTypeFromColumn(column: ColumnInfo) {
if (column.TYPE === 'NUMBER') {
// 2^64 = 18446744073709551616
if (column.NUMERIC_SCALE === '0' && parseInt(column.NUMERIC_PRECISION, 10) <= 19) {
return 'bigint';
}

return `DECIMAL(${column.NUMERIC_PRECISION}, ${column.NUMERIC_SCALE})`;
}

return SnowflakeToGenericType[column.TYPE.toLowerCase()] || super.toGenericType(column.TYPE);
}

public async tableColumnTypes(table: string) {
const [schema, name] = table.split('.');

const columns = await this.query<{ COLUMN_NAME: string, DATA_TYPE: string }[]>(
`SELECT columns.column_name,
columns.table_name,
columns.table_schema,
columns.data_type
const columns = await this.query<ColumnInfo[]>(
`SELECT columns.column_name as name,
columns.data_type as type,
columns.numeric_precision,
columns.numeric_scale
FROM information_schema.columns
WHERE table_name = ${this.param(0)} AND table_schema = ${this.param(1)}`,
[name.toUpperCase(), schema.toUpperCase()]
);

return columns.map(c => ({ name: c.COLUMN_NAME, type: this.toGenericType(c.DATA_TYPE) }));
return columns.map((c) => ({ name: c.NAME, type: this.toGenericTypeFromColumn(c) }));
}

public async getTablesQuery(schemaName: string) {
Expand Down