Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: extends CacheLayerInfo interface and Profile interface #310

Merged
merged 4 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion packages/core/src/lib/cache-layer/cacheLayerLoader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ export class CacheLayerLoader implements ICacheLayerLoader {
templateName: string,
cache: CacheLayerInfo
): Promise<void> {
const { cacheTableName, sql, profile, indexes, folderSubpath } = cache;
const {
cacheTableName,
sql,
profile,
indexes,
folderSubpath,
options: cacheOptions,
} = cache;
const type = this.options.type!;
const dataSource = this.dataSourceFactory(profile);

Expand Down Expand Up @@ -82,6 +89,7 @@ export class CacheLayerLoader implements ICacheLayerLoader {
directory,
profileName: profile,
type,
options: cacheOptions,
});
} else {
this.logger.debug(
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/models/artifact.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ export class CacheLayerInfo {
indexes?: Record<string, string>;
// cache folder subpath
folderSubpath?: string;
// options pass to the data source
options?: any;
}

export class APISchema {
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/models/extensions/dataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ export interface ExportOptions {
directory: string;
// The profile name to select to export data
profileName: string;
// data source options
options?: any;
// export file format type
type: CacheLayerStoreFormatType | string;
}
Expand Down
2 changes: 2 additions & 0 deletions packages/core/src/models/profile.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,6 @@ export interface Profile<C = Record<string, any>> {
cache?: C;
/** What users have access to this profile */
allow: ProfileAllowConstraints;
/** Properties that can be used when involking the dataSource method */
properties?: Record<string, any>;
}
33 changes: 21 additions & 12 deletions packages/extension-driver-canner/src/lib/cannerAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,23 @@ export class CannerAdapter {
// When querying Canner enterprise, the Canner enterprise will save the query result as parquet files,
// and store them in S3. This method will return the S3 urls of the query result.
// For more Canner API ref: https://docs.cannerdata.com/reference/restful
public async createAsyncQueryResultUrls(sql: string): Promise<string[]> {
public async createAsyncQueryResultUrls(
sql: string,
headers?: Record<string, string>
): Promise<string[]> {
this.logger.debug(`Create async request to Canner.`);
let data = await this.getWorkspaceRequestData('post', '/v2/async-queries', {
data: {
sql,
timeout: 600,
noLimit: true,
let data = await this.getWorkspaceRequestData(
'post',
'/v2/async-queries',
{
data: {
sql,
timeout: 600,
noLimit: true,
},
},
});
headers
);

const { id: requestId } = data;
this.logger.debug(`Wait Async request to finished.`);
Expand All @@ -60,14 +68,13 @@ export class CannerAdapter {
private async getWorkspaceRequestData(
method: string,
urlPath: string,
options?: Record<string, any>
options?: Record<string, any>,
headers?: Record<string, string>
) {
await this.prepare();
try {
const response = await axios({
headers: {
Authorization: `Token ${this.PAT}`,
},
headers: { ...headers, Authorization: `Token ${this.PAT}` },
params: {
workspaceSqlName: this.workspaceSqlName,
},
Expand All @@ -78,7 +85,9 @@ export class CannerAdapter {
return response.data;
} catch (error: any) {
const message = error.response
? `response status: ${error.response.status}, response data: ${error.response.data}`
? `response status: ${
error.response.status
}, response data: ${JSON.stringify(error.response.data)}`
: `remote server does not response. request ${error.toJSON()}}`;
throw new InternalError(
`Failed to get workspace request "${urlPath}" data, ${message}`
Expand Down
29 changes: 25 additions & 4 deletions packages/extension-driver-canner/src/lib/cannerDataSource.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
private logger = this.getLogger();
protected poolMapping = new Map<
string,
{ pool: Pool; options?: PGOptions }
{ pool: Pool; options?: PGOptions; properties?: Record<string, any> }
>();
protected UserPool = new Map<string, Pool>();

Expand All @@ -52,6 +52,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
this.poolMapping.set(profile.name, {
pool,
options: profile.connection,
properties: profile.properties,
});
this.logger.debug(`Profile ${profile.name} initialized`);
}
Expand All @@ -61,6 +62,7 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
sql,
directory,
profileName,
options: cannerOptions,
}: ExportOptions): Promise<void> {
if (!this.poolMapping.has(profileName)) {
throw new InternalError(`Profile instance ${profileName} not found`);
Expand All @@ -69,12 +71,16 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
if (!fs.existsSync(directory)) {
throw new InternalError(`Directory ${directory} not found`);
}
const { options: connection } = this.poolMapping.get(profileName)!;

const { options: connection, properties } =
this.poolMapping.get(profileName)!;
const cannerAdapter = new CannerAdapter(connection);
try {
this.logger.debug('Send the async query to the Canner Enterprise');
const presignedUrls = await cannerAdapter.createAsyncQueryResultUrls(sql);
const header = this.getCannerRequestHeader(properties, cannerOptions);
const presignedUrls = await cannerAdapter.createAsyncQueryResultUrls(
sql,
header
);
this.logger.debug(
'Start fetching the query result parquet files from URLs'
);
Expand All @@ -85,6 +91,21 @@ export class CannerDataSource extends DataSource<any, PGOptions> {
throw error;
}
}
private getCannerRequestHeader(
properties?: Record<string, any>,
cannerOptions?: any
) {
const header: Record<string, string> = {};
const userId = cannerOptions?.userId;
const rootUserId = properties?.['rootUserId'];
if (userId && rootUserId) {
header[
'x-trino-session'
] = `root_user_id=${rootUserId}, canner_user_id=${userId}`;
this.logger.debug(`Impersonate used: ${userId}`);
}
return header;
}

private async downloadFiles(urls: string[], directory: string) {
await Promise.all(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ it('Data source should export successfully', async () => {
sql: 'select 1',
directory,
profileName: 'profile1',
options: {},
} as ExportOptions)
).resolves.not.toThrow();
expect(fs.readdirSync(directory).length).toBe(1);
Expand Down Expand Up @@ -86,6 +87,7 @@ it('Data source should throw error when fail to export data', async () => {
sql: 'select 1',
directory,
profileName: 'profile1',
options: {},
} as ExportOptions)
).rejects.toThrow();
expect(fs.readdirSync(directory).length).toBe(0);
Expand All @@ -105,6 +107,7 @@ it('Data source should throw error when given directory is not exist', async ()
sql: 'select 1',
directory: directory,
profileName: 'profile1',
options: {},
} as ExportOptions)
).rejects.toThrow();
}, 100000);
Expand All @@ -121,6 +124,7 @@ it('Data source should throw error when given profile name is not exist', async
sql: 'select 1',
directory,
profileName: 'profile not exist',
options: {},
} as ExportOptions)
).rejects.toThrow();
}, 100000);
Expand Down
1 change: 1 addition & 0 deletions packages/extension-driver-canner/test/cannerServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ export class CannerServer {
database: process.env['CANNER_WORKSPACE_SQL_NAME'],
} as PGOptions,
allow: '*',
properties: {},
};
}
}
2 changes: 2 additions & 0 deletions packages/extension-store-canner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ export PROFILE_CANNER_DRIVER_PASSWORD=<password>
export PROFILE_CANNER_DRIVER_HOST=<host>
# Canner enterprise driver port, the default is 7432
export PROFILE_CANNER_DRIVER_PORT=<port>
# Canner enterprise root user id
export PROFILE_CANNER_DRIVER_ROOT_USER_ID=<userId>
```

### Connect Canner Enterprise used storage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ export class CannerProfileReader extends ProfileReader {

// generate profiles from the indicator files of each workspaces
const { user, password, host, port, max } = this.envConfig.profile;
const { rootUserId } = this.envConfig.properties;
if (!user || !password || !host)
throw new ConfigurationError(
'Canner profile reader needs username, password, host properties.'
Expand All @@ -67,6 +68,9 @@ export class CannerProfileReader extends ProfileReader {
max,
},
allow: '*',
properties: {
rootUserId,
},
} as Profile<Record<string, any>>;
this.logger.debug(`created "${profile.name}".`);
return profile;
Expand Down
8 changes: 8 additions & 0 deletions packages/extension-store-canner/src/lib/config.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
export interface CannerStoreConfig {
storage: StorageServiceOptions;
properties: CannnerDriverProfileProperties;
profile: CannerDriverProfileOptions;
}

export interface CannnerDriverProfileProperties {
rootUserId?: string;
}

export interface CannerDriverProfileOptions {
// user to connect to canner enterprise. Default is canner
user?: string;
Expand Down Expand Up @@ -64,6 +69,9 @@ export const getEnvConfig = (): CannerStoreConfig => {
max:
Number(process.env['PROFILE_CANNER_DRIVER_CONNECTION_POOL_MAX']) || 10,
},
properties: {
rootUserId: process.env['PROFILE_CANNER_DRIVER_ROOT_USER_ID'],
},
storage: {
provider: process.env['STORAGE_PROVIDER'],
// MINIO Provider options
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ describe('Test CannerProfileReader', () => {

sinon.default.stub(configModule, 'getEnvConfig').returns({
storage: sinon.stubInterface<configModule.StorageServiceOptions>(),
properties: {},
profile: {
host,
password,
Expand All @@ -119,6 +120,7 @@ describe('Test CannerProfileReader', () => {
user: 'canner',
password: 'secret-password',
port: 7432,
max: 10,
};
const expected = [
{
Expand All @@ -128,6 +130,9 @@ describe('Test CannerProfileReader', () => {
...connectionInfo,
database: fakeWorkspaces.ws1.sqlName,
},
properties: {
rootUserId: 'fakeRootUserId',
},
allow: '*',
},
{
Expand All @@ -137,6 +142,9 @@ describe('Test CannerProfileReader', () => {
...connectionInfo,
database: fakeWorkspaces.ws2.sqlName,
},
properties: {
rootUserId: 'fakeRootUserId',
},
allow: '*',
},
] as Profile<Record<string, any>>[];
Expand Down Expand Up @@ -164,6 +172,9 @@ describe('Test CannerProfileReader', () => {

sinon.default.stub(configModule, 'getEnvConfig').returns({
storage: sinon.stubInterface<configModule.StorageServiceOptions>(),
properties: {
rootUserId: 'fakeRootUserId',
},
profile: {
...connectionInfo,
},
Expand Down