Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: Fix response streams in template #42

Merged
merged 4 commits into from
Sep 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions labs/playground1/sqls/artist/works.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
{% req artist %}
select count(*) as count from "artists" where ConstituentID = {{ context.params.id }}
{% endreq %}

{% if artist.value()[0].count == 0 %}
{% error "Artist not found" %}
{% endif %}

select
*
from "artworks"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { RawAPISchema } from '@vulcan-sql/build/schema-parser';
import { ResponseSampler } from '@vulcan-sql/build/schema-parser/middleware/responseSampler';
import { FieldDataType, TemplateEngine } from '@vulcan-sql/core';
import { Stream } from 'stream';
import { Readable } from 'stream';
import * as sinon from 'ts-sinon';

it('Should create response definition when example parameter is provided', async () => {
Expand All @@ -19,7 +19,7 @@ it('Should create response definition when example parameter is provided', async
{ name: 'id', type: 'string' },
{ name: 'age', type: 'number' },
],
getData: () => new Stream(),
getData: () => new Readable(),
});
const responseSampler = new ResponseSampler(stubTemplateEngine);
// Act
Expand All @@ -44,7 +44,7 @@ it('Should create response definition when example parameter is a empty object',
{ name: 'id', type: 'string' },
{ name: 'age', type: 'number' },
],
getData: () => new Stream(),
getData: () => new Readable(),
});
const responseSampler = new ResponseSampler(stubTemplateEngine);
// Act
Expand All @@ -68,7 +68,7 @@ it('Should not create response definition when example parameter is not provided
{ name: 'id', type: 'string' },
{ name: 'age', type: 'number' },
],
getData: () => new Stream(),
getData: () => new Readable(),
});
const responseSampler = new ResponseSampler(stubTemplateEngine);
// Act
Expand Down Expand Up @@ -97,7 +97,7 @@ it('Should append response definition when there are some existed definitions',
{ name: 'age', type: 'number' },
{ name: 'name', type: 'boolean' },
],
getData: () => new Stream(),
getData: () => new Readable(),
});
const responseSampler = new ResponseSampler(stubTemplateEngine);
// Act
Expand Down
3 changes: 2 additions & 1 deletion packages/core/src/lib/data-query/builder/dataQueryBuilder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import {
DataSource,
Pagination,
BindParameters,
DataResult,
} from '@vulcan-sql/core/models';
import * as uuid from 'uuid';

Expand Down Expand Up @@ -403,7 +404,7 @@ export interface IDataQueryBuilder {
take(size: number, move: number): IDataQueryBuilder;
// paginate
paginate(pagination: Pagination): void;
value(): Promise<object>;
value(): Promise<DataResult>;
clone(): IDataQueryBuilder;
}

Expand Down
4 changes: 2 additions & 2 deletions packages/core/src/lib/data-source/pg.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Stream } from 'stream';
import { Readable } from 'stream';
import {
DataResult,
DataSource,
Expand All @@ -18,7 +18,7 @@ export class PGDataSource extends DataSource {
return [];
},
getData: () => {
return new Stream();
return new Readable();
},
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,36 @@ import {
FilterBuilder,
VulcanInternalExtension,
} from '@vulcan-sql/core/models';
import { EXECUTE_FILTER_NAME } from './constants';
import { EXECUTE_COMMAND_NAME, EXECUTE_FILTER_NAME } from './constants';
import * as nunjucks from 'nunjucks';
import { ReplaceChildFunc, visitChildren } from '../../extension-utils';

@VulcanInternalExtension()
export class ExecutorBuilder extends FilterBuilder {
public filterName = EXECUTE_FILTER_NAME;

public override onVisit(node: nunjucks.nodes.Node) {
visitChildren(node, this.replaceExecuteFunction.bind(this));
}

private replaceExecuteFunction(
node: nunjucks.nodes.Node,
replace: ReplaceChildFunc
) {
if (
node instanceof nunjucks.nodes.FunCall &&
node.name instanceof nunjucks.nodes.LookupVal &&
node.name.val.value === EXECUTE_COMMAND_NAME
) {
const args = new nunjucks.nodes.NodeList(node.lineno, node.colno);
args.addChild(node.name.target);
const filter = new nunjucks.nodes.Filter(
node.lineno,
node.colno,
new nunjucks.nodes.Symbol(node.lineno, node.colno, EXECUTE_FILTER_NAME),
args
);
replace(filter);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious about replace, when we call replace what do we replace to ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a callback function to remove or replace the current node in order to modify the AST tree, which provided by visitChildren helper.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for replying to me

}
}
}
Original file line number Diff line number Diff line change
@@ -1,19 +1,31 @@
import { IDataQueryBuilder } from '@vulcan-sql/core/data-query';
import {
DataResult,
FilterRunner,
FilterRunnerTransformOptions,
VulcanInternalExtension,
} from '@vulcan-sql/core/models';
import { streamToArray } from '@vulcan-sql/core/utils';
import { EXECUTE_FILTER_NAME } from './constants';

const isDataResult = (response: any): response is DataResult => {
return response.getColumns && response.getData;
};

@VulcanInternalExtension()
export class ExecutorRunner extends FilterRunner {
public filterName = EXECUTE_FILTER_NAME;

public async transform({
value,
value: builder,
}: FilterRunnerTransformOptions): Promise<any> {
const builder: IDataQueryBuilder = value;
return builder.value();
const response = await builder.value();

// if input value is not a query builder, call the function .value and do nothing.
if (!isDataResult(response)) return response;

const { getData } = response;
const dataStream = getData();
const data = await streamToArray(dataStream);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we execute builder.value() and consume the stream to the array in the ExecutorRunner when play RuntimeExtension and it's the final result, then how could we convert the array result to json / csv format in the middleware?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This filter here doesn't affect the final query builder, the formators will receive an unchanged stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, thanks

return data;
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
import { ReplaceChildFunc, visitChildren } from '../../extension-utils';
import * as nunjucks from 'nunjucks';
import {
EXECUTE_COMMAND_NAME,
EXECUTE_FILTER_NAME,
FINIAL_BUILDER_NAME,
METADATA_NAME,
REFERENCE_SEARCH_MAX_DEPTH,
} from './constants';
import { FINIAL_BUILDER_NAME, METADATA_NAME } from './constants';
import { TagBuilder, VulcanInternalExtension } from '@vulcan-sql/core/models';

interface DeclarationLocation {
Expand Down Expand Up @@ -100,15 +93,14 @@ export class ReqTagBuilder extends TagBuilder {
) {
this.checkBuilder(node);
this.checkMainBuilder(node);
} else {
visitChildren(node, this.replaceExecuteFunction.bind(this));
}
}

public override finish() {
if (!this.hasMainBuilder) {
this.wrapOutputWithBuilder();
}
this.reset();
}

public override getMetadata() {
Expand Down Expand Up @@ -157,53 +149,9 @@ export class ReqTagBuilder extends TagBuilder {
this.root.children = [builder];
}

private replaceExecuteFunction(
node: nunjucks.nodes.Node,
replace: ReplaceChildFunc
) {
if (
node instanceof nunjucks.nodes.FunCall &&
node.name instanceof nunjucks.nodes.LookupVal &&
node.name.val.value === EXECUTE_COMMAND_NAME
) {
let targetNode: typeof node.name.target | null = node.name.target;
let depth = 0;
while (targetNode) {
depth++;
if (depth > REFERENCE_SEARCH_MAX_DEPTH) {
throw new Error('Max depth reached');
}
if (targetNode instanceof nunjucks.nodes.LookupVal) {
targetNode = targetNode.target;
} else if (targetNode instanceof nunjucks.nodes.FunCall) {
targetNode = targetNode.name;
} else if (targetNode instanceof nunjucks.nodes.Symbol) {
break;
} else {
throw new Error(
`Unexpected node type: ${
(targetNode as nunjucks.nodes.Node)?.typename
}`
);
}
}

// If the target node is a variable from {% req xxx %}, replace it with execute filter
if (this.variableList.has((targetNode as nunjucks.nodes.Symbol).value)) {
const args = new nunjucks.nodes.NodeList(node.lineno, node.colno);
args.addChild(node.name.target);
const filter = new nunjucks.nodes.Filter(
node.lineno,
node.colno,
new nunjucks.nodes.Symbol(
node.lineno,
node.colno,
EXECUTE_FILTER_NAME
),
args
);
replace(filter);
}
}
private reset() {
this.variableList.clear();
this.root = undefined;
this.hasMainBuilder = false;
}
}
3 changes: 2 additions & 1 deletion packages/core/src/lib/template-engine/nunjucksCompiler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
TagRunner,
FilterBuilder,
FilterRunner,
DataResult,
} from '@vulcan-sql/core/models';

@injectable()
Expand Down Expand Up @@ -78,7 +79,7 @@ export class NunjucksCompiler implements Compiler {
templateName: string,
data: T,
pagination?: Pagination
): Promise<any> {
): Promise<DataResult> {
await this.initializeExtensions();
const builder = await this.renderAndGetMainBuilder(templateName, data);
if (pagination) builder.paginate(pagination);
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/lib/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './normalizedStringValue';
export * from './logger';
export * from './module';
export * from './streams';
29 changes: 29 additions & 0 deletions packages/core/src/lib/utils/streams.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { Readable } from 'stream';

export const arrayToStream = (array: any[]) => {
let index = 0;
const stream = new Readable({
objectMode: true,
read() {
if (index >= array.length) {
this.push(null);
return;
}
this.push(array[index++]);
},
});
return stream;
};

export const streamToArray = (stream: Readable) => {
const rows: any[] = [];
return new Promise<any[]>((resolve, reject) => {
stream.on('data', (data) => {
rows.push(data);
});
stream.on('end', () => {
resolve(rows);
});
stream.on('error', (err) => reject(err));
});
};
4 changes: 2 additions & 2 deletions packages/core/src/models/extensions/dataSource.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { SQLClauseOperation } from '@vulcan-sql/core/data-query';
import { Pagination } from '@vulcan-sql/core/models';
import { TYPES } from '@vulcan-sql/core/types';
import { Stream } from 'stream';
import { Readable } from 'stream';
import { ExtensionBase } from './base';
import { VulcanExtension } from './decorators';

Expand All @@ -24,7 +24,7 @@ export type DataColumn = { name: string; type: string };

export interface DataResult {
getColumns: () => DataColumn[];
getData: () => Stream;
getData: () => Readable;
}

export interface ExecuteOptions {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { arrayToStream, streamToArray } from '@vulcan-sql/core';
import { createTestCompiler } from '../../testCompiler';

it('Extension should execute correct query and set/export the variable', async () => {
Expand All @@ -8,18 +9,22 @@ it('Extension should execute correct query and set/export the variable', async (
select count(*) as count from user where user.id = {{ params.userId }};
{% endreq %}
`);
builder.value.onFirstCall().resolves([{ count: 1 }]);
builder.value.onFirstCall().resolves({
getColumns: () => [],
getData: () => arrayToStream([{ count: 1 }]),
});
// Action
loader.setSource('test', compiledData);
const result = await compiler.execute('test', {
params: { userId: 'user-id' },
});
const resultData = await streamToArray(result.getData());
// Assert
expect(executor.createBuilder.firstCall.args[0]).toBe(
`select count(*) as count from user where user.id = $1;`
);
expect(executor.createBuilder.firstCall.args[1].get('$1')).toBe(`user-id`);
expect(result).toEqual([{ count: 1 }]);
expect(resultData).toEqual([{ count: 1 }]);
});

it('If argument is not a symbol, extension should throw', async () => {
Expand Down Expand Up @@ -122,3 +127,21 @@ it('Extension should throw an error if there are multiple builders using same na
`We can't declare multiple builder with same name. Duplicated name: user (declared at 1:7 and 2:7)`
);
});

it('Extension should reset after compiled each template', async () => {
// Arrange
const { compiler } = await createTestCompiler();
compiler.compile(
`
{% req user main %} select * from users; {% endreq %}
`
);
// Act, Arrange
await expect(
compiler.compile(
`
{% req user main %} select * from users; {% endreq %}
`
)
).resolves.not.toThrow();
});
Loading