Skip to content

Commit 8c21911

Browse files
committed
feat(ext-dbt): load dbt table, view, incremantal, and ephemeral models
1 parent 3873abb commit 8c21911

16 files changed

+548
-41
lines changed

packages/core/src/index.ts

+1
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ export * from './lib/data-query';
66
export * from './lib/data-source';
77
export * from './models';
88
export * from './containers';
9+
export * from './options';

packages/core/src/lib/template-engine/extension-loader/models.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,9 @@ export abstract class TagBuilder extends CompileTimeExtension {
6464

6565
export abstract class TagRunner extends RuntimeExtension {
6666
abstract tags: string[];
67-
abstract run(options: TagRunnerOptions): Promise<string | void>;
67+
abstract run(
68+
options: TagRunnerOptions
69+
): Promise<string | nunjucks.runtime.SafeString | void>;
6870

6971
public __run(...originalArgs: any[]) {
7072
const context = originalArgs[0];

packages/extension-dbt/src/index.ts

+2-1
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
import { DBTTagBuilder } from './lib/dbtTagBuilder';
2+
import { DBTTagRunner } from './lib/dbtTagRunner';
23

3-
export default [DBTTagBuilder];
4+
export default [DBTTagBuilder, DBTTagRunner];
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,83 @@
1-
import { TagBuilder } from '@vulcan-sql/core';
1+
import {
2+
ITemplateEngineOptions,
3+
OnInit,
4+
TagBuilder,
5+
TYPES,
6+
} from '@vulcan-sql/core';
7+
import { inject, injectable } from 'inversify';
28
import * as nunjucks from 'nunjucks';
9+
import { promises as fs } from 'fs';
10+
import { chain } from 'lodash';
311

4-
export class DBTTagBuilder extends TagBuilder {
12+
@injectable()
13+
export class DBTTagBuilder extends TagBuilder implements OnInit {
514
public tags = ['dbt'];
15+
private modelFiles: string[] = [];
16+
private models = new Map<string, string>();
17+
18+
constructor(
19+
@inject(TYPES.TemplateEngineOptions) options: ITemplateEngineOptions
20+
) {
21+
super();
22+
this.modelFiles = options['dbt']?.modelFiles || [];
23+
}
24+
25+
public async onInit() {
26+
this.models.clear();
27+
for (const modelFile of this.modelFiles) {
28+
const content = JSON.parse(await fs.readFile(modelFile, 'utf-8'));
29+
chain(content.nodes || [])
30+
.toPairs()
31+
.filter((node) => node[0].startsWith('model'))
32+
.forEach((node) => this.loadModel(node[0], node[1]))
33+
.value();
34+
}
35+
}
636

737
public parse(
838
parser: nunjucks.parser.Parser,
939
nodes: typeof nunjucks.nodes,
1040
lexer: typeof nunjucks.lexer
1141
) {
12-
// {% dbt model-name %}
42+
// {% dbt "model-name" %}
1343
// consume dbt tag
1444
const dbtToken = parser.nextToken();
1545
const args = new nodes.NodeList(dbtToken.lineno, dbtToken.colno);
46+
const modelNameToken = parser.nextToken();
47+
if (modelNameToken.type !== lexer.TOKEN_STRING) {
48+
parser.fail(
49+
`Expect model name as string, but got ${modelNameToken.type}`,
50+
modelNameToken.lineno,
51+
modelNameToken.colno
52+
);
53+
}
54+
const end = parser.nextToken();
55+
if (end.type !== lexer.TOKEN_BLOCK_END) {
56+
parser.fail(
57+
`Expect block end %}, but got ${end.type}`,
58+
end.lineno,
59+
end.colno
60+
);
61+
}
62+
63+
const sql = this.models.get(modelNameToken.value);
64+
if (!sql) {
65+
parser.fail(
66+
`Model ${modelNameToken.value} is not found in modelFiles`,
67+
modelNameToken.lineno,
68+
modelNameToken.colno
69+
);
70+
}
71+
72+
const output = new nodes.Output(dbtToken.lineno, dbtToken.colno);
73+
output.addChild(
74+
new nodes.TemplateData(dbtToken.lineno, dbtToken.colno, sql)
75+
);
76+
return this.createAsyncExtensionNode(args, [output]);
77+
}
1678

17-
return this.createAsyncExtensionNode(args, []);
79+
private loadModel(name: string, node: any) {
80+
if (this.models.has(name)) throw Error(`Model name ${name} is unambiguous`);
81+
this.models.set(name, node.relation_name || `(${node.compiled_sql})`);
1882
}
1983
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import { TagRunner, TagRunnerOptions } from '@vulcan-sql/core';
2+
import { injectable } from 'inversify';
3+
import * as nunjucks from 'nunjucks';
4+
5+
@injectable()
6+
export class DBTTagRunner extends TagRunner {
7+
public tags = ['dbt'];
8+
9+
public async run({ contentArgs }: TagRunnerOptions) {
10+
const sql = await contentArgs[0]();
11+
return new nunjucks.runtime.SafeString(sql);
12+
}
13+
}
+235
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,235 @@
1+
import { getTestCompiler } from '@vulcan-sql/test-utility';
2+
import * as path from 'path';
3+
4+
it('Should replace with table name of dbt model with type table', async () => {
5+
// Arrange
6+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
7+
{
8+
extensionNames: [path.join(__dirname, '..', 'src')],
9+
config: {
10+
dbt: {
11+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
12+
},
13+
},
14+
}
15+
);
16+
17+
// Act.
18+
await compileAndLoad(`select * from {% dbt "model.test.1_table" %}`);
19+
await execute({});
20+
21+
// Assert
22+
const queries = await getExecutedQueries();
23+
expect(queries[0]).toBe('select * from "postgres"."public"."1_table"');
24+
});
25+
26+
it('Should replace with table name of dbt model with type view', async () => {
27+
// Arrange
28+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
29+
{
30+
extensionNames: [path.join(__dirname, '..', 'src')],
31+
config: {
32+
dbt: {
33+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
34+
},
35+
},
36+
}
37+
);
38+
39+
// Act.
40+
await compileAndLoad(`select * from {% dbt "model.test.2_view" %}`);
41+
await execute({});
42+
43+
// Assert
44+
const queries = await getExecutedQueries();
45+
expect(queries[0]).toBe('select * from "postgres"."public"."2_view"');
46+
});
47+
48+
it('Should replace with sub-query of dbt model with type ephemeral', async () => {
49+
// Arrange
50+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
51+
{
52+
extensionNames: [path.join(__dirname, '..', 'src')],
53+
config: {
54+
dbt: {
55+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
56+
},
57+
},
58+
}
59+
);
60+
61+
// Act.
62+
await compileAndLoad(
63+
`select sub.* from {% dbt "model.test.3_ephemeral" %} as sub`
64+
);
65+
await execute({});
66+
67+
// Assert
68+
const queries = await getExecutedQueries();
69+
expect(queries[0]).toBe(`select sub.* from (
70+
select *
71+
from "postgres"."public"."1_table"
72+
where age <= 18) as sub`);
73+
});
74+
75+
it('Should replace with table name of dbt model with type incremental', async () => {
76+
// Arrange
77+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
78+
{
79+
extensionNames: [path.join(__dirname, '..', 'src')],
80+
config: {
81+
dbt: {
82+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
83+
},
84+
},
85+
}
86+
);
87+
88+
// Act.
89+
await compileAndLoad(`select * from {% dbt "model.test.4_incremental" %}`);
90+
await execute({});
91+
92+
// Assert
93+
const queries = await getExecutedQueries();
94+
expect(queries[0]).toBe('select * from "postgres"."public"."4_incremental"');
95+
});
96+
97+
it('Should merge multiple artifacts', async () => {
98+
// Arrange
99+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
100+
{
101+
extensionNames: [path.join(__dirname, '..', 'src')],
102+
config: {
103+
dbt: {
104+
modelFiles: [
105+
path.join(__dirname, 'test-artifact.json'),
106+
path.join(__dirname, 'test-artifact-2.json'),
107+
],
108+
},
109+
},
110+
}
111+
);
112+
113+
// Act.
114+
await compileAndLoad(
115+
`{% dbt "model.test.4_incremental" %}{% dbt "model.test.5_model_from_artifact_2" %}`
116+
);
117+
await execute({});
118+
119+
// Assert
120+
const queries = await getExecutedQueries();
121+
expect(queries[0]).toBe(
122+
'"postgres"."public"."4_incremental""postgres"."public"."1_table"'
123+
);
124+
});
125+
126+
it('Should throw error when models are unambiguous', async () => {
127+
// Arrange
128+
const { compileAndLoad } = await getTestCompiler({
129+
extensionNames: [path.join(__dirname, '..', 'src')],
130+
config: {
131+
dbt: {
132+
modelFiles: [
133+
path.join(__dirname, 'test-artifact-2.json'),
134+
path.join(__dirname, 'test-artifact-2.json'),
135+
],
136+
},
137+
},
138+
});
139+
140+
// Act. Assert
141+
await expect(compileAndLoad(`Some sql`)).rejects.toThrow(
142+
`Model name model.test.5_model_from_artifact_2 is unambiguous`
143+
);
144+
});
145+
146+
it('Should throw error when model name not found', async () => {
147+
// Arrange
148+
const { compileAndLoad } = await getTestCompiler({
149+
extensionNames: [path.join(__dirname, '..', 'src')],
150+
config: {
151+
dbt: {
152+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
153+
},
154+
},
155+
});
156+
157+
// Act. Assert
158+
await expect(compileAndLoad(`{% dbt "not.found.model" %}`)).rejects.toThrow(
159+
`Model not.found.model is not found in modelFiles`
160+
);
161+
});
162+
163+
it('Should throw error when argument type is not correct', async () => {
164+
// Arrange
165+
const { compileAndLoad } = await getTestCompiler({
166+
extensionNames: [path.join(__dirname, '..', 'src')],
167+
config: {
168+
dbt: {
169+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
170+
},
171+
},
172+
});
173+
174+
// Act. Assert
175+
await expect(compileAndLoad(`{% dbt model.test.1_table %}`)).rejects.toThrow(
176+
`Expect model name as string, but got symbol`
177+
);
178+
});
179+
180+
it('Should throw error when there are too many arguments', async () => {
181+
// Arrange
182+
const { compileAndLoad } = await getTestCompiler({
183+
extensionNames: [path.join(__dirname, '..', 'src')],
184+
config: {
185+
dbt: {
186+
modelFiles: [path.join(__dirname, 'test-artifact.json')],
187+
},
188+
},
189+
});
190+
191+
// Act. Assert
192+
await expect(
193+
compileAndLoad(`{% dbt "model.test.1_table" extra arg %}`)
194+
).rejects.toThrow(`Expect block end %}, but got symbol`);
195+
});
196+
197+
it('Should not throw error even if there is no dbt config', async () => {
198+
// Arrange
199+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
200+
{
201+
extensionNames: [path.join(__dirname, '..', 'src')],
202+
config: {},
203+
}
204+
);
205+
206+
// Act.
207+
await compileAndLoad(`some query`);
208+
await execute({});
209+
210+
// Assert
211+
const queries = await getExecutedQueries();
212+
expect(queries[0]).toBe('some query');
213+
});
214+
215+
it('Should not throw error even if the artifact file is empty', async () => {
216+
// Arrange
217+
const { compileAndLoad, execute, getExecutedQueries } = await getTestCompiler(
218+
{
219+
extensionNames: [path.join(__dirname, '..', 'src')],
220+
config: {
221+
dbt: {
222+
modelFiles: [path.join(__dirname, 'empty-artifact.json')],
223+
},
224+
},
225+
}
226+
);
227+
228+
// Act.
229+
await compileAndLoad(`some query`);
230+
await execute({});
231+
232+
// Assert
233+
const queries = await getExecutedQueries();
234+
expect(queries[0]).toBe('some query');
235+
});
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{{ config(materialized='table') }}
2+
3+
with source_data as (
4+
select 1 as id, 'Ivan' as name, 18 as age
5+
UNION
6+
select 2 as id, 'William' as name, 80 as age
7+
UNION
8+
select 3 as id, 'Eason' as name, 18 as age
9+
)
10+
11+
select * from source_data
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{{ config(materialized='view') }}
2+
3+
select *
4+
from {{ ref('1_table') }}
5+
where age <= 18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{{ config(materialized='ephemeral') }}
2+
3+
select *
4+
from {{ ref('1_table') }}
5+
where age <= 18
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
{{ config(materialized='incremental') }}
2+
3+
select *
4+
from {{ ref('3_ephemeral') }}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
{}

0 commit comments

Comments
 (0)