Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue with "Compiled dbt Preview" option - Not able to correctly compile dbt's - {{ this }} #938

Open
1 task
dbtCldDev opened this issue Feb 28, 2024 · 1 comment
Assignees
Labels
bug Something isn't working sweep

Comments

@dbtCldDev
Copy link

dbtCldDev commented Feb 28, 2024

Expected behavior

When clicked on "Compiled dbt Preview" the compile code for {{ this }} should give correct object name.
Example
Assume

  1. database - demo_db
  2. schema - demo_schema
  3. Model name is - my_second_dbt_model
  4. Model code
{{ this }}

The compiled code should be.

demo_db.demo_schema.my_second_dbt_model

Actual behavior

When clicked on "Compiled dbt Preview" the compile code for {{ this }}, it gives a random value for object name.
Example - Taking the same example mentioned above
The actual compiled code is

demo_db.demo_schema.t_d1a663caa68b4ebc8580e7bf4ed1d62b

Steps To Reproduce

  1. Create a model and add following code
{{ this }}
  1. Click on "Compiled dbt Preview"

Log output/Screenshots

image

Operating System

Windows 10

dbt version

1.7.8

dbt Adapter

dbt-snowflake

dbt Power User version

0.34.3

Are you willing to submit PR?

  • Yes I am willing to submit a PR!
@dbtCldDev dbtCldDev added the bug Something isn't working label Feb 28, 2024
@mdesmet mdesmet self-assigned this Mar 13, 2024
Copy link
Contributor

sweep-ai bot commented May 23, 2024

🚀 Here's the PR! #1161

💎 Sweep Pro: You have unlimited Sweep issues

Actions

  • ↻ Restart Sweep

Step 1: 🔎 Searching

Here are the code search results. I'm now analyzing these search results to write the PR.

Relevant files (click to expand). Mentioned files will always appear here.

import {
CancellationToken,
Diagnostic,
DiagnosticCollection,
Disposable,
languages,
Range,
RelativePattern,
Uri,
window,
workspace,
} from "vscode";
import {
extendErrorWithSupportLinks,
getFirstWorkspacePath,
getProjectRelativePath,
provideSingleton,
setupWatcherHandler,
} from "../utils";
import {
Catalog,
CompilationResult,
DBColumn,
DBTNode,
DBTCommand,
DBTCommandExecutionInfrastructure,
DBTDetection,
DBTProjectDetection,
DBTProjectIntegration,
ExecuteSQLResult,
PythonDBTCommandExecutionStrategy,
QueryExecution,
SourceNode,
Node,
ExecuteSQLError,
HealthcheckArgs,
} from "./dbtIntegration";
import { PythonEnvironment } from "../manifest/pythonEnvironment";
import { CommandProcessExecutionFactory } from "../commandProcessExecution";
import { PythonBridge, PythonException } from "python-bridge";
import * as path from "path";
import { DBTProject } from "../manifest/dbtProject";
import { existsSync, readFileSync } from "fs";
import { parse } from "yaml";
import { TelemetryService } from "../telemetry";
import {
AltimateRequest,
NotFoundError,
ValidateSqlParseErrorResponse,
} from "../altimate";
import { DBTProjectContainer } from "../manifest/dbtProjectContainer";
import { ManifestPathType } from "../constants";
import { DBTTerminal } from "./dbtTerminal";
import { ValidationProvider } from "../validation_provider";
import { DeferToProdService } from "../services/deferToProdService";
const DEFAULT_QUERY_TEMPLATE = "select * from ({query}) as query limit {limit}";
// TODO: we shouold really get these from manifest directly
interface ResolveReferenceNodeResult {
database: string;
schema: string;
alias: string;
}
interface ResolveReferenceSourceResult {
database: string;
schema: string;
alias: string;
resource_type: string;
identifier: string;
}
interface DeferConfig {
deferToProduction: boolean;
favorState: boolean;
manifestPathForDeferral: string;
manifestPathType?: ManifestPathType;
dbtCoreIntegrationId?: number;
}
type InsightType = "Modelling" | "Test" | "structure";
interface Insight {
name: string;
type: InsightType;
message: string;
recommendation: string;
reason_to_flag: string;
metadata: {
model?: string;
model_unique_id?: string;
model_type?: string;
convention?: string | null;
};
}
type Severity = "ERROR" | "WARNING";
interface ModelInsight {
insight: Insight;
severity: Severity;
unique_id: string;
package_name: string;
path: string;
original_file_path: string;
}
export interface ProjectHealthcheck {
model_insights: Record<string, ModelInsight[]>;
// package_insights: any;
}
@provideSingleton(DBTCoreDetection)
export class DBTCoreDetection implements DBTDetection {
constructor(
private pythonEnvironment: PythonEnvironment,
private commandProcessExecutionFactory: CommandProcessExecutionFactory,
) {}
async detectDBT(): Promise<boolean> {
try {
const checkDBTInstalledProcess =
this.commandProcessExecutionFactory.createCommandProcessExecution({
command: this.pythonEnvironment.pythonPath,
args: ["-c", "import dbt"],
cwd: getFirstWorkspacePath(),
envVars: this.pythonEnvironment.environmentVariables,
});
const { stderr } = await checkDBTInstalledProcess.complete();
if (stderr) {
throw new Error(stderr);
}
return true;
} catch (error) {
return false;
}
}
}
@provideSingleton(DBTCoreProjectDetection)
export class DBTCoreProjectDetection
implements DBTProjectDetection, Disposable
{
constructor(
private executionInfrastructure: DBTCommandExecutionInfrastructure,
private dbtTerminal: DBTTerminal,
) {}
private getPackageInstallPathFallback(
projectDirectory: Uri,
packageInstallPath: string,
): string {
const dbtProjectFile = path.join(
projectDirectory.fsPath,
"dbt_project.yml",
);
if (existsSync(dbtProjectFile)) {
const dbtProjectConfig: any = parse(readFileSync(dbtProjectFile, "utf8"));
const packagesInstallPath = dbtProjectConfig["packages-install-path"];
if (packagesInstallPath) {
if (path.isAbsolute(packagesInstallPath)) {
return packagesInstallPath;
} else {
return path.join(projectDirectory.fsPath, packagesInstallPath);
}
}
}
return packageInstallPath;
}
async discoverProjects(projectDirectories: Uri[]): Promise<Uri[]> {
let packagesInstallPaths = projectDirectories.map((projectDirectory) =>
path.join(projectDirectory.fsPath, "dbt_packages"),
);
let python: PythonBridge | undefined;
try {
python = this.executionInfrastructure.createPythonBridge(
getFirstWorkspacePath(),
);
await python.ex`from dbt_core_integration import *`;
const packagesInstallPathsFromPython = await python.lock<string[]>(
(python) =>
python`to_dict(find_package_paths(${projectDirectories.map(
(projectDirectory) => projectDirectory.fsPath,
)}))`,
);
packagesInstallPaths = packagesInstallPaths.map(
(packageInstallPath, index) => {
const packageInstallPathFromPython =
packagesInstallPathsFromPython[index];
if (packageInstallPathFromPython) {
return Uri.file(packageInstallPathFromPython).fsPath;
}
return packageInstallPath;
},
);
} catch (error) {
this.dbtTerminal.debug(
"dbtCoreIntegration:discoverProjects",
"An error occured while finding package paths: " + error,
);
// Fallback to reading yaml files
packagesInstallPaths = projectDirectories.map((projectDirectory, idx) =>
this.getPackageInstallPathFallback(
projectDirectory,
packagesInstallPaths[idx],
),
);
} finally {
if (python) {
this.executionInfrastructure.closePythonBridge(python);
}
}
const filteredProjectFiles = projectDirectories.filter((uri) => {
return !packagesInstallPaths.some((packageInstallPath) => {
return uri.fsPath.startsWith(packageInstallPath!);
});
});
if (filteredProjectFiles.length > 20) {
window.showWarningMessage(
`dbt Power User detected ${filteredProjectFiles.length} projects in your work space, this will negatively affect performance.`,
);
}
return filteredProjectFiles;
}
async dispose() {}
}
@provideSingleton(DBTCoreProjectIntegration)
export class DBTCoreProjectIntegration
implements DBTProjectIntegration, Disposable
{
static DBT_PROFILES_FILE = "profiles.yml";
private profilesDir?: string;
private targetPath?: string;
private adapterType?: string;
private version?: number[];
private packagesInstallPath?: string;
private modelPaths?: string[];
private seedPaths?: string[];
private macroPaths?: string[];
private python: PythonBridge;
private disposables: Disposable[] = [];
private readonly rebuildManifestDiagnostics =
languages.createDiagnosticCollection("dbt");
private readonly pythonBridgeDiagnostics =
languages.createDiagnosticCollection("dbt");
private static QUEUE_ALL = "all";
constructor(
private executionInfrastructure: DBTCommandExecutionInfrastructure,
private pythonEnvironment: PythonEnvironment,
private telemetry: TelemetryService,
private pythonDBTCommandExecutionStrategy: PythonDBTCommandExecutionStrategy,
private dbtProjectContainer: DBTProjectContainer,
private altimateRequest: AltimateRequest,
private dbtTerminal: DBTTerminal,
private validationProvider: ValidationProvider,
private deferToProdService: DeferToProdService,
private projectRoot: Uri,
private projectConfigDiagnostics: DiagnosticCollection,
) {
this.dbtTerminal.debug(
"DBTCoreProjectIntegration",
`Registering dbt core project at ${this.projectRoot}`,
);
this.python = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
this.executionInfrastructure.createQueue(
DBTCoreProjectIntegration.QUEUE_ALL,
);
this.disposables.push(
this.pythonEnvironment.onPythonEnvironmentChanged(() => {
this.python = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
}),
this.rebuildManifestDiagnostics,
this.pythonBridgeDiagnostics,
);
}
// remove the trailing slashes if they exists,
// causes the quote to be escaped when passing to python
private removeTrailingSlashes(input: string | undefined) {
return input?.replace(/\\+$/, "");
}
private getLimitQuery(queryTemplate: string, query: string, limit: number) {
return queryTemplate
.replace("{query}", () => query)
.replace("{limit}", () => limit.toString());
}
private async getQuery(
query: string,
limit: number,
): Promise<{ queryTemplate: string; limitQuery: string }> {
try {
const dbtVersion = await this.version;
//dbt supports limit macro after v1.5
if (dbtVersion && dbtVersion[0] >= 1 && dbtVersion[1] >= 5) {
const args = { sql: query, limit };
const queryTemplateFromMacro = await this.python?.lock(
(python) =>
python!`to_dict(project.execute_macro('get_limit_subquery_sql', ${args}))`,
);
this.dbtTerminal.debug(
"DBTCoreProjectIntegration",
"Using query template from macro",
queryTemplateFromMacro,
);
return {
queryTemplate: queryTemplateFromMacro,
limitQuery: queryTemplateFromMacro,
};
}
} catch (err) {
console.error("Error while getting get_limit_subquery_sql macro", err);
this.telemetry.sendTelemetryError(
"executeMacroGetLimitSubquerySQLError",
err,
{ adapter: this.adapterType || "unknown" },
);
}
const queryTemplate = workspace
.getConfiguration("dbt")
.get<string>("queryTemplate");
if (queryTemplate && queryTemplate !== DEFAULT_QUERY_TEMPLATE) {
console.log("Using user provided query template", queryTemplate);
const limitQuery = this.getLimitQuery(queryTemplate, query, limit);
return { queryTemplate, limitQuery };
}
return {
queryTemplate: DEFAULT_QUERY_TEMPLATE,
limitQuery: this.getLimitQuery(DEFAULT_QUERY_TEMPLATE, query, limit),
};
}
async refreshProjectConfig(): Promise<void> {
await this.createPythonDbtProject(this.python);
await this.python.ex`project.init_project()`;
this.targetPath = await this.findTargetPath();
this.modelPaths = await this.findModelPaths();
this.seedPaths = await this.findSeedPaths();
this.macroPaths = await this.findMacroPaths();
this.packagesInstallPath = await this.findPackagesInstallPath();
this.version = await this.findVersion();
this.adapterType = await this.findAdapterType();
}
async executeSQL(query: string, limit: number): Promise<QueryExecution> {
this.throwBridgeErrorIfAvailable();
const { limitQuery } = await this.getQuery(query, limit);
const queryThread = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
await this.createPythonDbtProject(queryThread);
await queryThread.ex`project.init_project()`;
return new QueryExecution(
async () => {
queryThread.kill(2);
},
async () => {
// compile query
const compiledQuery = await this.unsafeCompileQuery(limitQuery);
// execute query
let result: ExecuteSQLResult;
try {
result = await queryThread!.lock<ExecuteSQLResult>(
(python) => python`to_dict(project.execute_sql(${compiledQuery}))`,
);
const { manifestPathType } =
this.deferToProdService.getDeferConfigByProjectRoot(
this.projectRoot.fsPath,
);
if (manifestPathType === ManifestPathType.REMOTE) {
this.altimateRequest.sendDeferToProdEvent(ManifestPathType.REMOTE);
}
} catch (err) {
const message = `Error while executing sql: ${compiledQuery}`;
this.dbtTerminal.error("dbtCore:executeSQL", message, err);
if (err instanceof PythonException) {
throw new ExecuteSQLError(err.exception.message, compiledQuery!);
}
throw new ExecuteSQLError((err as Error).message, compiledQuery!);
}
return { ...result, compiled_stmt: compiledQuery };
},
);
}
private async createPythonDbtProject(bridge: PythonBridge) {
await bridge.ex`from dbt_core_integration import *`;
const targetPath = this.removeTrailingSlashes(
await bridge.lock(
(python) => python`target_path(${this.projectRoot.fsPath})`,
),
);
const { deferToProduction, manifestPath, favorState } =
await this.getDeferConfig();
await bridge.ex`project = DbtProject(project_dir=${this.projectRoot.fsPath}, profiles_dir=${this.profilesDir}, target_path=${targetPath}, defer_to_prod=${deferToProduction}, manifest_path=${manifestPath}, favor_state=${favorState}) if 'project' not in locals() else project`;
}
async initializeProject(): Promise<void> {
try {
await this.python
.ex`from dbt_core_integration import default_profiles_dir`;
await this.python.ex`from dbt_healthcheck import *`;
this.profilesDir = this.removeTrailingSlashes(
await this.python.lock(
(python) => python`default_profiles_dir(${this.projectRoot.fsPath})`,
),
);
if (this.profilesDir) {
const dbtProfileWatcher = workspace.createFileSystemWatcher(
new RelativePattern(
this.profilesDir,
DBTCoreProjectIntegration.DBT_PROFILES_FILE,
),
);
this.disposables.push(
dbtProfileWatcher,
// when the project config changes we need to re-init the dbt project
...setupWatcherHandler(dbtProfileWatcher, () =>
this.rebuildManifest(),
),
);
}
await this.createPythonDbtProject(this.python);
this.pythonBridgeDiagnostics.clear();
} catch (exc: any) {
if (exc instanceof PythonException) {
// python errors can be about anything, so we just associate the error with the project file
// with a fixed range
if (exc.message.includes("No module named 'dbt'")) {
// Let's not create an error for each project if dbt is not detected
// This is already displayed in the status bar
return;
}
let errorMessage =
"An error occured while initializing the dbt project: " +
exc.exception.message;
if (exc.exception.type.module === "dbt.exceptions") {
// TODO: we can do provide solutions per type of dbt exception
errorMessage =
"An error occured while initializing the dbt project, dbt found following issue: " +
exc.exception.message;
}
this.pythonBridgeDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[new Diagnostic(new Range(0, 0, 999, 999), errorMessage)],
);
this.telemetry.sendTelemetryError("pythonBridgeInitPythonError", exc);
} else {
window.showErrorMessage(
extendErrorWithSupportLinks(
"An unexpected error occured while initializing the dbt project at " +
this.projectRoot +
": " +
exc +
".",
),
);
this.telemetry.sendTelemetryError("pythonBridgeInitError", exc);
}
}
}
getTargetPath(): string | undefined {
return this.targetPath;
}
getModelPaths(): string[] | undefined {
return this.modelPaths;
}
getSeedPaths(): string[] | undefined {
return this.seedPaths;
}
getMacroPaths(): string[] | undefined {
return this.macroPaths;
}
getPackageInstallPath(): string | undefined {
return this.packagesInstallPath;
}
getAdapterType(): string | undefined {
return this.adapterType;
}
getVersion(): number[] | undefined {
return this.version;
}
async findAdapterType(): Promise<string | undefined> {
return this.python.lock<string>(
(python) => python`project.config.credentials.type`,
);
}
getPythonBridgeStatus(): boolean {
return this.python.connected;
}
getAllDiagnostic(): Diagnostic[] {
const projectURI = Uri.joinPath(
this.projectRoot,
DBTProject.DBT_PROJECT_FILE,
);
return [
...(this.pythonBridgeDiagnostics.get(projectURI) || []),
...(this.projectConfigDiagnostics.get(projectURI) || []),
...(this.rebuildManifestDiagnostics.get(projectURI) || []),
];
}
async rebuildManifest(): Promise<void> {
const errors = this.projectConfigDiagnostics.get(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
);
if (errors !== undefined && errors.length > 0) {
// No point in trying to rebuild the manifest if the config is not valid
return;
}
try {
await this.python.lock(
(python) => python`to_dict(project.safe_parse_project())`,
);
this.rebuildManifestDiagnostics.clear();
} catch (exc) {
if (exc instanceof PythonException) {
// dbt errors can be about anything, so we just associate the error with the project file
// with a fixed range
this.rebuildManifestDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[
new Diagnostic(
new Range(0, 0, 999, 999),
"There is a problem in your dbt project. Compilation failed: " +
exc.exception.message,
),
],
);
this.telemetry.sendTelemetryEvent(
"pythonBridgeCannotParseProjectUserError",
{
error: exc.exception.message,
adapter: this.getAdapterType() || "unknown", // TODO: this should be moved to dbtProject
},
);
return;
}
// if we get here, it is not a dbt error but an extension error.
this.telemetry.sendTelemetryError(
"pythonBridgeCannotParseProjectUnknownError",
exc,
{
adapter: this.adapterType || "unknown", // TODO: this should be moved to dbtProject
},
);
window.showErrorMessage(
extendErrorWithSupportLinks(
"An error occured while rebuilding the dbt manifest: " + exc + ".",
),
);
}
}
async runModel(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async buildModel(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async buildProject(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async runTest(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async runModelTest(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async compileModel(command: DBTCommand) {
this.addCommandToQueue(
await this.addDeferParams(this.dbtCoreCommand(command)),
);
}
async generateDocs(command: DBTCommand) {
this.addCommandToQueue(this.dbtCoreCommand(command));
}
async executeCommandImmediately(command: DBTCommand) {
return await this.dbtCoreCommand(command).execute();
}
async deps(command: DBTCommand) {
const { stdout, stderr } = await this.dbtCoreCommand(command).execute();
if (stderr) {
throw new Error(stderr);
}
return stdout;
}
async debug(command: DBTCommand) {
const { stdout, stderr } = await this.dbtCoreCommand(command).execute();
if (stderr) {
throw new Error(stderr);
}
return stdout;
}
private addCommandToQueue(command: DBTCommand) {
const isInstalled =
this.dbtProjectContainer.showErrorIfDbtOrPythonNotInstalled();
if (!isInstalled) {
return;
}
this.executionInfrastructure.addCommandToQueue(
DBTCoreProjectIntegration.QUEUE_ALL,
command,
);
}
private async getDeferManifestPath(
manifestPathType: ManifestPathType | undefined,
manifestPathForDeferral: string,
dbtCoreIntegrationId: number | undefined,
): Promise<string> {
if (!manifestPathType) {
const configNotPresent = new Error(
"Please configure defer to production functionality by specifying manifest path in Actions panel before using it.",
);
throw configNotPresent;
}
if (manifestPathType === ManifestPathType.LOCAL) {
if (!manifestPathForDeferral) {
const configNotPresent = new Error(
"manifestPathForDeferral config is not present, use the actions panel to set the Defer to production configuration.",
);
this.dbtTerminal.error(
"manifestPathForDeferral",
"manifestPathForDeferral is not present",
configNotPresent,
);
throw configNotPresent;
}
return manifestPathForDeferral;
}
if (manifestPathType === ManifestPathType.REMOTE) {
try {
this.validationProvider.throwIfNotAuthenticated();
} catch (err) {
throw new Error(
"Defer to production is currently enabled with 'DataPilot dbt integration' mode. It requires a valid Altimate AI API key and instance name in the settings. In order to run dbt commands, please either switch to Local Path mode or disable the feature or add an API key / instance name.",
);
}
this.dbtTerminal.debug(
"remoteManifest",
`fetching artifact url for dbtCoreIntegrationId: ${dbtCoreIntegrationId}`,
);
try {
const response = await this.altimateRequest.fetchArtifactUrl(
"manifest",
dbtCoreIntegrationId!,
);
const manifestPath = await this.altimateRequest.downloadFileLocally(
response.url,
this.projectRoot,
);
console.log(`Set remote manifest path: ${manifestPath}`);
return manifestPath;
} catch (error) {
if (error instanceof NotFoundError) {
const manifestNotFoundError = new Error(
"Unable to download remote manifest file. Did you upload your manifest using the Altimate DataPilot CLI?",
);
this.dbtTerminal.error(
"remoteManifestError",
"Unable to download remote manifest file.",
manifestNotFoundError,
);
throw manifestNotFoundError;
}
throw error;
}
}
throw new Error(`Invalid manifestPathType: ${manifestPathType}`);
}
private async getDeferParams(): Promise<string[]> {
const deferConfig = this.deferToProdService.getDeferConfigByProjectRoot(
this.projectRoot.fsPath,
);
const {
deferToProduction,
manifestPathForDeferral,
favorState,
manifestPathType,
dbtCoreIntegrationId,
} = deferConfig;
if (!deferToProduction) {
this.dbtTerminal.debug("deferToProd", "defer to prod not enabled");
return [];
}
const manifestPath = await this.getDeferManifestPath(
manifestPathType,
manifestPathForDeferral,
dbtCoreIntegrationId,
);
const args = ["--defer", "--state", manifestPath];
if (favorState) {
args.push("--favor-state");
}
this.dbtTerminal.debug(
"deferToProd",
`executing dbt command with defer params ${manifestPathType} mode`,
true,
args,
);
if (manifestPathType === ManifestPathType.REMOTE) {
this.altimateRequest.sendDeferToProdEvent(ManifestPathType.REMOTE);
}
return args;
}
private async addDeferParams(command: DBTCommand) {
const deferParams = await this.getDeferParams();
deferParams.forEach((param) => command.addArgument(param));
return command;
}
private dbtCoreCommand(command: DBTCommand) {
command.addArgument("--project-dir");
command.addArgument(this.projectRoot.fsPath);
if (this.profilesDir) {
command.addArgument("--profiles-dir");
command.addArgument(this.profilesDir);
}
command.setExecutionStrategy(this.pythonDBTCommandExecutionStrategy);
return command;
}
// internal commands
async unsafeCompileNode(modelName: string): Promise<string> {
this.throwBridgeErrorIfAvailable();
const output = await this.python?.lock<CompilationResult>(
(python) =>
python!`to_dict(project.compile_node(project.get_ref_node(${modelName})))`,
);
return output.compiled_sql;
}
async unsafeCompileQuery(query: string): Promise<string> {
this.throwBridgeErrorIfAvailable();
const output = await this.python?.lock<CompilationResult>(
(python) => python!`to_dict(project.compile_sql(${query}))`,
);
return output.compiled_sql;
}
async validateSql(query: string, dialect: string, models: any) {
this.throwBridgeErrorIfAvailable();
const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
(python) =>
python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
);
return result;
}
async validateSQLDryRun(query: string) {
this.throwBridgeErrorIfAvailable();
const result = await this.python?.lock<{ bytes_processed: string }>(
(python) => python!`to_dict(project.validate_sql_dry_run(${query}))`,
);
return result;
}
async getColumnsOfModel(modelName: string) {
this.throwBridgeErrorIfAvailable();
// Get database and schema
const node = (await this.python?.lock(
(python) => python!`to_dict(project.get_ref_node(${modelName}))`,
)) as ResolveReferenceNodeResult;
// Get columns
if (!node) {
return [];
}
// TODO: fix this type
return this.getColumsOfRelation(
node.database,
node.schema,
node.alias || modelName,
);
}
async getColumnsOfSource(sourceName: string, tableName: string) {
this.throwBridgeErrorIfAvailable();
// Get database and schema
const node = (await this.python?.lock(
(python) =>
python!`to_dict(project.get_source_node(${sourceName}, ${tableName}))`,
)) as ResolveReferenceSourceResult;
// Get columns
if (!node) {
return [];
}
return this.getColumsOfRelation(
node.database,
node.schema,
node.identifier,
);
}
private async getColumsOfRelation(
database: string | undefined,
schema: string | undefined,
objectName: string,
): Promise<DBColumn[]> {
this.throwBridgeErrorIfAvailable();
return this.python?.lock<DBColumn[]>(
(python) =>
python!`to_dict(project.get_columns_in_relation(project.create_relation(${database}, ${schema}, ${objectName})))`,
);
}
async getBulkSchema(
nodes: DBTNode[],
cancellationToken: CancellationToken,
): Promise<Record<string, DBColumn[]>> {
const result: Record<string, DBColumn[]> = {};
for (const n of nodes) {
if (cancellationToken.isCancellationRequested) {
break;
}
if (n.resource_type === DBTProject.RESOURCE_TYPE_SOURCE) {
const source = n as SourceNode;
result[n.unique_id] = await this.getColumnsOfSource(
source.name,
source.table,
);
} else {
const model = n as Node;
result[n.unique_id] = await this.getColumnsOfModel(model.name);
}
}
return result;
}
async getCatalog(): Promise<Catalog> {
this.throwBridgeErrorIfAvailable();
return await this.python?.lock<Catalog>(
(python) => python!`to_dict(project.get_catalog())`,
);
}
// get dbt config
private async findModelPaths(): Promise<string[]> {
return (
await this.python.lock<string[]>(
(python) => python`to_dict(project.config.model_paths)`,
)
).map((modelPath: string) => {
if (!path.isAbsolute(modelPath)) {
return path.join(this.projectRoot.fsPath, modelPath);
}
return modelPath;
});
}
private async findSeedPaths(): Promise<string[]> {
return (
await this.python.lock<string[]>(
(python) => python`to_dict(project.config.seed_paths)`,
)
).map((seedPath: string) => {
if (!path.isAbsolute(seedPath)) {
return path.join(this.projectRoot.fsPath, seedPath);
}
return seedPath;
});
}
getDebounceForRebuildManifest() {
return 2000;
}
private async findMacroPaths(): Promise<string[]> {
return (
await this.python.lock<string[]>(
(python) => python`to_dict(project.config.macro_paths)`,
)
).map((macroPath: string) => {
if (!path.isAbsolute(macroPath)) {
return path.join(this.projectRoot.fsPath, macroPath);
}
return macroPath;
});
}
private async findTargetPath(): Promise<string> {
let targetPath = await this.python.lock(
(python) => python`to_dict(project.config.target_path)`,
);
if (!path.isAbsolute(targetPath)) {
targetPath = path.join(this.projectRoot.fsPath, targetPath);
}
return targetPath;
}
private async findPackagesInstallPath(): Promise<string> {
let packageInstallPath = await this.python.lock(
(python) => python`to_dict(project.config.packages_install_path)`,
);
if (!path.isAbsolute(packageInstallPath)) {
packageInstallPath = path.join(
this.projectRoot.fsPath,
packageInstallPath,
);
}
return packageInstallPath;
}
private async findVersion(): Promise<number[]> {
return this.python?.lock<number[]>(
(python) => python!`to_dict(project.get_dbt_version())`,
);
}
private throwBridgeErrorIfAvailable() {
const allDiagnostics: DiagnosticCollection[] = [
this.pythonBridgeDiagnostics,
this.projectConfigDiagnostics,
this.rebuildManifestDiagnostics,
];
for (const diagnosticCollection of allDiagnostics) {
for (const [_, diagnostics] of diagnosticCollection) {
if (diagnostics.length > 0) {
const firstError = diagnostics[0];
throw new Error(firstError.message);
}
}
}
}
findPackageVersion(packageName: string) {
if (!this.packagesInstallPath) {
throw new Error("Missing packages install path");
}
if (!packageName) {
throw new Error("Invalid package name");
}
const dbtProjectYmlFilePath = path.join(
this.packagesInstallPath,
packageName,
"dbt_project.yml",
);
if (!existsSync(dbtProjectYmlFilePath)) {
throw new Error("Package not installed");
}
const fileContents = readFileSync(dbtProjectYmlFilePath, {
encoding: "utf-8",
});
if (!fileContents) {
throw new Error(`${packageName} has empty dbt_project.yml`);
}
const parsedConfig = parse(fileContents, {
strict: false,
uniqueKeys: false,
maxAliasCount: -1,
});
if (!parsedConfig?.version) {
throw new Error(`Missing version in ${dbtProjectYmlFilePath}`);
}
return parsedConfig.version;
}
async dispose() {
try {
await this.executionInfrastructure.closePythonBridge(this.python);
} catch (error) {} // We don't care about errors here.
this.rebuildManifestDiagnostics.clear();
this.pythonBridgeDiagnostics.clear();
while (this.disposables.length) {
const x = this.disposables.pop();
if (x) {
x.dispose();
}
}
}
async performDatapilotHealthcheck({
manifestPath,
catalogPath,
config,
configPath,
}: HealthcheckArgs): Promise<ProjectHealthcheck> {
this.throwBridgeErrorIfAvailable();
const healthCheckThread = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
await this.createPythonDbtProject(healthCheckThread);
await healthCheckThread.ex`from dbt_healthcheck import *`;
const result = await healthCheckThread.lock<ProjectHealthcheck>(
(python) =>
python!`to_dict(project_healthcheck(${manifestPath}, ${catalogPath}, ${configPath}, ${config}))`,
);
return result;
}
private async getDeferConfig() {
try {
const root = getProjectRelativePath(this.projectRoot);
const currentConfig: Record<string, DeferConfig> =
this.deferToProdService.getDeferConfigByWorkspace();
const {
deferToProduction,
manifestPathForDeferral,
favorState,
manifestPathType,
dbtCoreIntegrationId,
} = currentConfig[root];
const manifestFolder = await this.getDeferManifestPath(
manifestPathType,
manifestPathForDeferral,
dbtCoreIntegrationId,
);
const manifestPath = path.join(manifestFolder, DBTProject.MANIFEST_FILE);
return { deferToProduction, manifestPath, favorState };
} catch (error) {
this.dbtTerminal.debug(
"dbtCoreIntegration:getDeferConfig",
"An error occured while getting defer config: " +
(error as Error).message,
);
}
return { deferToProduction: false, manifestPath: null, favorState: false };
}
async applyDeferConfig(): Promise<void> {
const { deferToProduction, manifestPath, favorState } =
await this.getDeferConfig();
await this.python?.lock<void>(
(python) =>
python!`project.set_defer_config(${deferToProduction}, ${manifestPath}, ${favorState})`,
);
await this.rebuildManifest();
}
throwDiagnosticsErrorIfAvailable(): void {
this.throwBridgeErrorIfAvailable();
}

import { basename } from "path";
import { AltimateRequest, ModelNode } from "../altimate";
import { ColumnMetaData, NodeMetaData, SourceTable } from "../domain";
import { DBTProjectContainer } from "../manifest/dbtProjectContainer";
import {
ManifestCacheChangedEvent,
ManifestCacheProjectAddedEvent,
} from "../manifest/event/manifestCacheChangedEvent";
import { TelemetryService } from "../telemetry";
import { extendErrorWithSupportLinks, provideSingleton } from "../utils";
import {
CancellationToken,
DiagnosticCollection,
ProgressLocation,
Uri,
ViewColumn,
window,
} from "vscode";
import { DBTProject } from "../manifest/dbtProject";
import {
commands,
Diagnostic,
DiagnosticSeverity,
languages,
Position,
Range,
workspace,
} from "vscode";
import { SqlPreviewContentProvider } from "../content_provider/sqlPreviewContentProvider";
import { PythonException } from "python-bridge";
import { DBTTerminal } from "../dbt_client/dbtTerminal";
@provideSingleton(ValidateSql)
export class ValidateSql {
private eventMap: Map<string, ManifestCacheProjectAddedEvent> = new Map();
private diagnosticsCollection: DiagnosticCollection;
constructor(
private dbtProjectContainer: DBTProjectContainer,
private telemetry: TelemetryService,
private altimate: AltimateRequest,
private dbtTerminal: DBTTerminal,
) {
dbtProjectContainer.onManifestChanged((event) =>
this.onManifestCacheChanged(event),
);
this.diagnosticsCollection = languages.createDiagnosticCollection();
}
private async onManifestCacheChanged(event: ManifestCacheChangedEvent) {
event.added?.forEach((added) => {
this.eventMap.set(added.project.projectRoot.fsPath, added);
});
event.removed?.forEach((removed) => {
this.eventMap.delete(removed.projectRoot.fsPath);
});
}
private showError(exc: unknown) {
if (exc instanceof PythonException) {
window.showErrorMessage(
extendErrorWithSupportLinks(
`An error occured while trying to compile your model: ` +
exc.exception.message +
".",
),
);
this.telemetry.sendTelemetryError(
"validateSQLCompileNodePythonError",
exc,
);
this.dbtTerminal.error(
"validateSQLError",
"Error encountered while compiling/retrieving schema for model",
exc,
);
return;
}
this.telemetry.sendTelemetryError(
"validateSQLCompileNodeUnknownError",
exc,
);
// Unknown error
window.showErrorMessage(
extendErrorWithSupportLinks(
"Could not validate SQL: " + (exc as Error).message,
),
);
}
async validateSql() {
this.telemetry.sendTelemetryEvent("validateSql");
if (!window.activeTextEditor) {
return;
}
const activedoc = window.activeTextEditor;
const currentFilePath = activedoc.document.uri;
const project = this.dbtProjectContainer.findDBTProject(currentFilePath);
if (!project) {
await window.showErrorMessage("Unable to build project");
return;
}
const modelName = basename(currentFilePath.fsPath, ".sql");
const event = this.getEvent();
if (!event) {
return;
}
const { graphMetaMap, nodeMetaMap } = event;
const node = nodeMetaMap.get(modelName);
if (!node) {
return;
}
const parentNodes = graphMetaMap.parents.get(node.uniqueId)?.nodes;
if (!parentNodes) {
return;
}
const parentModels: ModelNode[] = [];
let relationsWithoutColumns: string[] = [];
let compiledQuery: string | undefined;
let cancellationToken: CancellationToken | undefined;
await window.withProgress(
{
location: ProgressLocation.Notification,
title: "Validating SQL",
cancellable: true,
},
async (_, token) => {
try {
cancellationToken = token;
const fileContentBytes = await workspace.fs.readFile(currentFilePath);
if (cancellationToken.isCancellationRequested) {
return;
}
try {
compiledQuery = await project.unsafeCompileQuery(
fileContentBytes.toString(),
);
} catch (error) {
window.showErrorMessage(
extendErrorWithSupportLinks(
"Unable to compile query for model " +
node.name +
" : " +
error,
),
);
return;
}
if (cancellationToken.isCancellationRequested) {
return;
}
const modelsToFetch = DBTProject.getNonEphemeralParents(event, [
node.uniqueId,
]);
const {
mappedNode,
relationsWithoutColumns: _relationsWithoutColumns,
} = await project.getNodesWithDBColumns(
event,
modelsToFetch,
cancellationToken,
);
parentModels.push(...modelsToFetch.map((n) => mappedNode[n]));
relationsWithoutColumns = _relationsWithoutColumns;
} catch (exc) {
this.showError(exc);
}
},
);
if (cancellationToken?.isCancellationRequested) {
return;
}
if (!compiledQuery) {
return;
}
if (relationsWithoutColumns.length !== 0) {
window.showErrorMessage(
extendErrorWithSupportLinks(
"Failed to fetch columns for " +
relationsWithoutColumns.join(", ") +
". Probably the dbt models are not yet materialized.",
),
);
}
const request = {
sql: compiledQuery,
dialect: project.getAdapterType(),
models: parentModels,
};
const response = await this.getProject()?.validateSql(request);
const activeUri = window.activeTextEditor?.document.uri;
if (activeUri.scheme === SqlPreviewContentProvider.SCHEME) {
// current focus on compiled sql document
return;
}
const compileSQLUri = activeUri.with({
scheme: SqlPreviewContentProvider.SCHEME,
});
const isOpen = !!window.visibleTextEditors.find(
(item) => item.document.uri === compileSQLUri,
);
if (!response || !response?.error_type) {
const tabGroup = window.tabGroups.all.find(
(tabGroup) =>
(tabGroup.activeTab?.input as { uri: Uri })?.uri.toString() ===
compileSQLUri.toString(),
);
if (tabGroup) {
await window.tabGroups.close(tabGroup);
}
window.showInformationMessage("SQL is valid.");
this.diagnosticsCollection.set(compileSQLUri, []);
return;
}
if (response.error_type === "sql_unknown_error") {
window.showErrorMessage("Unable to validate SQL.");
this.telemetry.sendTelemetryError(
"validateSQLError",
response.errors[0].description,
);
this.diagnosticsCollection.set(compileSQLUri, []);
return;
}
if (
response.error_type === "sql_parse_error" ||
(response.errors.length > 0 && response.errors[0].start_position)
) {
if (!isOpen) {
const doc = await workspace.openTextDocument(compileSQLUri);
await window.showTextDocument(doc, ViewColumn.Beside, true);
await languages.setTextDocumentLanguage(doc, "sql");
}
}
commands.executeCommand("workbench.action.problems.focus");
const diagnostics = response?.errors?.map(
({ description, start_position, end_position }) => {
let startPos = new Position(0, 1);
let endPos = new Position(0, 1);
if (start_position) {
startPos = new Position(start_position[0], start_position[1]);
}
if (end_position) {
endPos = new Position(end_position[0], end_position[1]);
}
return new Diagnostic(
new Range(startPos, endPos),
description,
DiagnosticSeverity.Error,
);
},
);
this.diagnosticsCollection.set(compileSQLUri, diagnostics);
}
private getProject() {
const currentFilePath = window.activeTextEditor?.document.uri;
if (!currentFilePath) {
return;
}
return this.dbtProjectContainer.findDBTProject(currentFilePath);
}
private getEvent(): ManifestCacheProjectAddedEvent | undefined {
if (window.activeTextEditor === undefined || this.eventMap === undefined) {
return;
}
const currentFilePath = window.activeTextEditor.document.uri;
const projectRootpath =
this.dbtProjectContainer.getProjectRootpath(currentFilePath);
if (projectRootpath === undefined) {
return;
}
const event = this.eventMap.get(projectRootpath.fsPath);
if (event === undefined) {
return;
}
return event;
}

import {
workspace,
Uri,
languages,
Disposable,
Range,
window,
CancellationTokenSource,
Diagnostic,
DiagnosticCollection,
DiagnosticSeverity,
CancellationToken,
} from "vscode";
import { provideSingleton } from "../utils";
import {
Catalog,
DBColumn,
DBTNode,
DBTCommand,
DBTCommandExecutionInfrastructure,
DBTCommandExecutionStrategy,
DBTCommandFactory,
DBTDetection,
DBTProjectDetection,
DBTProjectIntegration,
QueryExecution,
HealthcheckArgs,
} from "./dbtIntegration";
import { CommandProcessExecutionFactory } from "../commandProcessExecution";
import { PythonBridge } from "python-bridge";
import { join, dirname } from "path";
import { AltimateRequest, ValidateSqlParseErrorResponse } from "../altimate";
import path = require("path");
import { DBTProject } from "../manifest/dbtProject";
import { TelemetryService } from "../telemetry";
import { DBTTerminal } from "./dbtTerminal";
import { PythonEnvironment } from "../manifest/pythonEnvironment";
import { existsSync } from "fs";
import { ValidationProvider } from "../validation_provider";
import { DeferToProdService } from "../services/deferToProdService";
import { ProjectHealthcheck } from "./dbtCoreIntegration";
import semver = require("semver");
function getDBTPath(
pythonEnvironment: PythonEnvironment,
terminal: DBTTerminal,
): string {
if (pythonEnvironment.pythonPath) {
const allowedDbtPaths = ["dbt", "dbt.exe"];
const dbtPath = allowedDbtPaths.find((path) =>
existsSync(join(dirname(pythonEnvironment.pythonPath), path)),
);
if (dbtPath) {
const dbtPythonPath = join(
dirname(pythonEnvironment.pythonPath),
dbtPath,
);
terminal.debug("Found dbt path in Python bin directory:", dbtPythonPath);
return dbtPythonPath;
}
}
terminal.debug("Using default dbt path:", "dbt");
return "dbt";
}
@provideSingleton(DBTCloudDetection)
export class DBTCloudDetection implements DBTDetection {
constructor(
private commandProcessExecutionFactory: CommandProcessExecutionFactory,
private pythonEnvironment: PythonEnvironment,
private terminal: DBTTerminal,
) {}
async detectDBT(): Promise<boolean> {
const dbtPath = getDBTPath(this.pythonEnvironment, this.terminal);
try {
this.terminal.debug("DBTCLIDetection", "Detecting dbt cloud cli");
const checkDBTInstalledProcess =
this.commandProcessExecutionFactory.createCommandProcessExecution({
command: dbtPath,
args: ["--version"],
cwd: this.getFirstWorkspacePath(),
});
const { stdout, stderr } = await checkDBTInstalledProcess.complete();
if (stderr) {
throw new Error(stderr);
}
if (stdout.includes("dbt Cloud CLI")) {
const regex = /dbt Cloud CLI - (\d*\.\d*\.\d*)/gm;
const matches = regex.exec(stdout);
if (matches?.length === 2) {
const minVersion = "0.37.6";
if (semver.lt(matches[1], minVersion)) {
window.showErrorMessage(
`This version of dbt Cloud is not supported. Please update to a dbt Cloud CLI version higher than ${minVersion}`,
);
this.terminal.debug(
"DBTCLIDetectionFailed",
"dbt cloud cli was found but version is not supported. Detection command returned : " +
stdout,
);
return true;
}
}
this.terminal.debug("DBTCLIDetectionSuccess", "dbt cloud cli detected");
return true;
} else {
this.terminal.debug(
"DBTCLIDetectionFailed",
"dbt cloud cli was not found. Detection command returned : " +
stdout,
);
}
} catch (error) {
this.terminal.warn(
"DBTCLIDetectionError",
"Detection failed with error : " + (error as Error).message,
);
}
this.terminal.debug(
"DBTCLIDetectionFailed",
"dbt cloud cli was not found. Detection command returning false",
);
return false;
}
private getFirstWorkspacePath(): string {
// If we are executing python via a wrapper like Meltano,
// we need to execute it from a (any) project directory
// By default, Command execution is in an ext dir context
const folders = workspace.workspaceFolders;
if (folders) {
return folders[0].uri.fsPath;
} else {
// TODO: this shouldn't happen but we should make sure this is valid fallback
return Uri.file("./").fsPath;
}
}
}
@provideSingleton(DBTCloudProjectDetection)
export class DBTCloudProjectDetection
implements DBTProjectDetection, Disposable
{
constructor(private altimate: AltimateRequest) {}
async discoverProjects(projectDirectories: Uri[]): Promise<Uri[]> {
this.altimate.handlePreviewFeatures();
const packagesInstallPaths = projectDirectories.map((projectDirectory) =>
path.join(projectDirectory.fsPath, "dbt_packages"),
);
const filteredProjectFiles = projectDirectories.filter((uri) => {
return !packagesInstallPaths.some((packageInstallPath) => {
return uri.fsPath.startsWith(packageInstallPath!);
});
});
if (filteredProjectFiles.length > 20) {
window.showWarningMessage(
`dbt Power User detected ${filteredProjectFiles.length} projects in your work space, this will negatively affect performance.`,
);
}
return filteredProjectFiles;
}
async dispose() {}
}
@provideSingleton(DBTCloudProjectIntegration)
export class DBTCloudProjectIntegration
implements DBTProjectIntegration, Disposable
{
private static QUEUE_ALL = "all";
private targetPath?: string;
private adapterType: string = "unknown";
private packagesInstallPath?: string;
private modelPaths?: string[];
private seedPaths?: string[];
private macroPaths?: string[];
private python: PythonBridge;
private dbtPath: string = "dbt";
private disposables: Disposable[] = [];
private readonly rebuildManifestDiagnostics =
languages.createDiagnosticCollection("dbt");
private readonly pythonBridgeDiagnostics =
languages.createDiagnosticCollection("dbt");
private rebuildManifestCancellationTokenSource:
| CancellationTokenSource
| undefined;
private pathsInitalized = false;
constructor(
private executionInfrastructure: DBTCommandExecutionInfrastructure,
private dbtCommandFactory: DBTCommandFactory,
private cliDBTCommandExecutionStrategyFactory: (
path: Uri,
dbtPath: string,
) => DBTCommandExecutionStrategy,
private telemetry: TelemetryService,
private pythonEnvironment: PythonEnvironment,
private terminal: DBTTerminal,
private validationProvider: ValidationProvider,
private deferToProdService: DeferToProdService,
private projectRoot: Uri,
) {
this.terminal.debug(
"DBTCloudProjectIntegration",
`Registering dbt cloud project at ${this.projectRoot}`,
);
this.python = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
this.executionInfrastructure.createQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
);
this.disposables.push(
this.pythonEnvironment.onPythonEnvironmentChanged(() => {
this.python = this.executionInfrastructure.createPythonBridge(
this.projectRoot.fsPath,
);
this.initializeProject();
}),
this.rebuildManifestDiagnostics,
this.pythonBridgeDiagnostics,
);
}
async refreshProjectConfig(): Promise<void> {
if (!this.pathsInitalized) {
// First time let,s block
await this.initializePaths();
this.pathsInitalized = true;
} else {
this.initializePaths();
}
this.findAdapterType();
}
async executeSQL(query: string, limit: number): Promise<QueryExecution> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const showCommand = this.dbtCloudCommand(
new DBTCommand("Running sql...", [
"show",
"--inline",
query,
"--limit",
limit.toString(),
"--output",
"json",
"--log-format",
"json",
]),
);
const cancellationTokenSource = new CancellationTokenSource();
showCommand.setToken(cancellationTokenSource.token);
return new QueryExecution(
async () => {
cancellationTokenSource.cancel();
},
async () => {
const { stdout, stderr } = await showCommand.execute(
cancellationTokenSource.token,
);
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
const previewLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("preview"));
const preview = JSON.parse(previewLine[0].data.preview);
return {
table: {
column_names: preview.length > 0 ? Object.keys(preview[0]) : [],
column_types:
preview.length > 0
? Object.keys(preview[0]).map((obj: any) => "string")
: [],
rows: preview.map((obj: any) => Object.values(obj)),
},
compiled_sql: "",
raw_sql: query,
};
},
);
}
async initializeProject(): Promise<void> {
try {
await this.python
.ex`from dbt_cloud_integration import validate_sql, to_dict`;
await this.python.ex`from dbt_healthcheck import *`;
} catch (error) {
this.terminal.error(
"dbtCloudIntegration",
"Could not initalize Python environemnt",
error,
);
window.showErrorMessage(
"Error occurred while initializing Python environment: " + error,
);
}
this.dbtPath = getDBTPath(this.pythonEnvironment, this.terminal);
}
getTargetPath(): string | undefined {
return this.targetPath;
}
getModelPaths(): string[] | undefined {
return this.modelPaths;
}
getSeedPaths(): string[] | undefined {
return this.seedPaths;
}
getMacroPaths(): string[] | undefined {
return this.macroPaths;
}
getPackageInstallPath(): string | undefined {
return this.packagesInstallPath;
}
getAdapterType(): string {
return this.adapterType;
}
getVersion(): number[] {
// TODO: get version
return [0, 0, 0];
}
getPythonBridgeStatus(): boolean {
return this.python.connected;
}
getAllDiagnostic(): Diagnostic[] {
return [
...(this.pythonBridgeDiagnostics.get(this.projectRoot) || []),
...(this.rebuildManifestDiagnostics.get(this.projectRoot) || []),
];
}
async rebuildManifest(): Promise<void> {
if (this.rebuildManifestCancellationTokenSource) {
this.rebuildManifestCancellationTokenSource.cancel();
this.rebuildManifestCancellationTokenSource = undefined;
}
try {
const command = this.dbtCloudCommand(
this.dbtCommandFactory.createParseCommand(),
);
command.addArgument("--log-format");
command.addArgument("json");
this.rebuildManifestCancellationTokenSource =
new CancellationTokenSource();
command.setToken(this.rebuildManifestCancellationTokenSource.token);
const { stderr } = await command.execute();
const errorsAndWarnings = stderr
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()));
const errors = errorsAndWarnings
.filter(
(line) => line.info.level === "error" || line.info.level === "fatal",
)
.map((line) => line.info.msg);
const warnings = errorsAndWarnings
.filter((line) => line.info.level === "warning")
.map((line) => line.info.msg);
this.rebuildManifestDiagnostics.clear();
errors.forEach((error) =>
this.rebuildManifestDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[
new Diagnostic(
new Range(0, 0, 999, 999),
error,
DiagnosticSeverity.Error,
),
],
),
);
warnings.forEach((warning) =>
this.rebuildManifestDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[
new Diagnostic(
new Range(0, 0, 999, 999),
warning,
DiagnosticSeverity.Warning,
),
],
),
);
if (stderr) {
this.telemetry.sendTelemetryEvent(
"dbtCloudCannotParseProjectUserError",
{
error: stderr,
adapter: this.getAdapterType() || "unknown",
},
);
}
} catch (error) {
this.telemetry.sendTelemetryError(
"dbtCloudCannotParseProjectUnknownError",
error,
{
adapter: this.getAdapterType() || "unknown",
},
);
}
}
async runModel(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async buildModel(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async buildProject(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async runTest(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async runModelTest(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async compileModel(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
await this.addDeferParams(this.dbtCloudCommand(command)),
);
}
async generateDocs(command: DBTCommand) {
this.addCommandToQueue(
DBTCloudProjectIntegration.QUEUE_ALL,
this.dbtCloudCommand(command),
);
}
async executeCommandImmediately(command: DBTCommand) {
return await this.dbtCloudCommand(command).execute();
}
async deps(command: DBTCommand): Promise<string> {
throw new Error("dbt deps is not supported in dbt cloud");
}
async debug(command: DBTCommand): Promise<string> {
command.args = ["environment", "show"];
const { stdout, stderr } = await this.dbtCloudCommand(command).execute();
if (stderr) {
throw new Error(stderr);
}
return stdout;
}
private async getDeferParams(): Promise<string[]> {
this.throwIfNotAuthenticated();
const deferConfig = this.deferToProdService.getDeferConfigByProjectRoot(
this.projectRoot.fsPath,
);
const { deferToProduction } = deferConfig;
// explicitly checking false to make sure defer is disabled
if (!deferToProduction) {
this.terminal.debug("Defer to Prod", "defer to prod not enabled");
return ["--no-defer"];
}
return [];
}
private async addDeferParams(command: DBTCommand) {
const deferParams = await this.getDeferParams();
deferParams.forEach((param) => command.addArgument(param));
return command;
}
private dbtCloudCommand(command: DBTCommand) {
command.setExecutionStrategy(
this.cliDBTCommandExecutionStrategyFactory(
this.projectRoot,
this.dbtPath,
),
);
command.addArgument("--source");
command.addArgument("dbt-power-user");
return command;
}
private addCommandToQueue(queueName: string, command: DBTCommand) {
try {
this.throwIfNotAuthenticated();
this.executionInfrastructure.addCommandToQueue(queueName, command);
} catch (e) {
window.showErrorMessage((e as Error).message);
}
}
// internal commands
async unsafeCompileNode(modelName: string): Promise<string> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Compiling model...", [
"compile",
"--model",
modelName,
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await compileQueryCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return compiledLine[0].data.compiled;
}
async unsafeCompileQuery(query: string): Promise<string> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Compiling sql...", [
"compile",
"--inline",
query,
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await compileQueryCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return compiledLine[0].data.compiled;
}
async validateSql(
query: string,
dialect: string,
models: any,
): Promise<ValidateSqlParseErrorResponse> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const result = await this.python?.lock<ValidateSqlParseErrorResponse>(
(python) =>
python!`to_dict(validate_sql(${query}, ${dialect}, ${models}))`,
);
return result;
}
async validateSQLDryRun(query: string): Promise<{ bytes_processed: string }> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const validateSqlCommand = this.dbtCloudCommand(
new DBTCommand("Estimating BigQuery cost...", [
"compile",
"--inline",
`{{ validate_sql('${query}') }}`,
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await validateSqlCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return JSON.parse(compiledLine[0].data.compiled);
}
async getColumnsOfSource(
sourceName: string,
tableName: string,
): Promise<DBColumn[]> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Getting columns of source...", [
"compile",
"--inline",
`{% set output = [] %}{% for result in adapter.get_columns_in_relation(source('${sourceName}', '${tableName}')) %} {% do output.append({"column": result.name, "dtype": result.dtype}) %} {% endfor %} {{ tojson(output) }}`,
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await compileQueryCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return JSON.parse(compiledLine[0].data.compiled);
}
async getColumnsOfModel(modelName: string): Promise<DBColumn[]> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Getting columns of model...", [
"compile",
"--inline",
`{% set output = [] %}{% for result in adapter.get_columns_in_relation(ref('${modelName}')) %} {% do output.append({"column": result.name, "dtype": result.dtype}) %} {% endfor %} {{ tojson(output) }}`,
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await compileQueryCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return JSON.parse(compiledLine[0].data.compiled);
}
async getBulkSchema(
nodes: DBTNode[],
cancellationToken: CancellationToken,
): Promise<Record<string, DBColumn[]>> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const bulkModelQuery = `
{% set result = {} %}
{% for n in ${JSON.stringify(nodes)} %}
{% set columns = adapter.get_columns_in_relation(ref(n["name"])) %}
{% set new_columns = [] %}
{% for column in columns %}
{% do new_columns.append({"column": column.name, "dtype": column.dtype}) %}
{% endfor %}
{% do result.update({n["unique_id"]:new_columns}) %}
{% endfor %}
{% for n in graph.sources.values() %}
{% set columns = adapter.get_columns_in_relation(source(n["source_name"], n["identifier"])) %}
{% set new_columns = [] %}
{% for column in columns %}
{% do new_columns.append({"column": column.name, "dtype": column.dtype}) %}
{% endfor %}
{% do result.update({n["unique_id"]:new_columns}) %}
{% endfor %}
{{ tojson(result) }}`;
console.log(bulkModelQuery);
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Getting catalog...", [
"compile",
"--inline",
bulkModelQuery.trim().split("\n").join(""),
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } =
await compileQueryCommand.execute(cancellationToken);
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
return JSON.parse(compiledLine[0].data.compiled);
}
async getCatalog(): Promise<Catalog> {
this.throwIfNotAuthenticated();
this.throwBridgeErrorIfAvailable();
const bulkModelQuery = `
{% set result = [] %}
{% for n in graph.nodes.values() %}
{% if n.resource_type == "test" or
n.resource_type == "analysis" or
n.resource_type == "sql_operation" or
n.config.materialized == "ephemeral" %}
{% continue %}
{% endif %}
{% set columns = adapter.get_columns_in_relation(ref(n["name"])) %}
{% for column in columns %}
{% do result.append({
"table_database": n.database,
"table_schema": n.schema,
"table_name": n.name,
"column_name": column.name,
"column_type": column.dtype,
}) %}
{% endfor %}
{% endfor %}
{% for n in graph.sources.values() %}
{% set columns = adapter.get_columns_in_relation(source(n["source_name"], n["identifier"])) %}
{% for column in columns %}
{% do result.append({
"table_database": n.database,
"table_schema": n.schema,
"table_name": n.name,
"column_name": column.name,
"column_type": column.dtype,
}) %}
{% endfor %}
{% endfor %}
{{ tojson(result) }}`;
const compileQueryCommand = this.dbtCloudCommand(
new DBTCommand("Getting catalog...", [
"compile",
"--inline",
bulkModelQuery.trim().split("\n").join(""),
"--output",
"json",
"--log-format",
"json",
]),
);
const { stdout, stderr } = await compileQueryCommand.execute();
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
const exception = this.processJSONErrors(stderr);
if (exception) {
throw exception;
}
const result: Catalog = JSON.parse(compiledLine[0].data.compiled);
return result;
}
getDebounceForRebuildManifest() {
return 500;
}
// get dbt config
private async initializePaths() {
const packagePathsCommand = this.dbtCloudCommand(
new DBTCommand("Getting paths...", [
"environment",
"show",
"--project-paths",
]),
);
try {
const { stdout, stderr } = await packagePathsCommand.execute();
if (stderr) {
this.terminal.warn(
"DbtCloudIntegrationInitializePathsStdError",
"packaging paths command returns warning, ignoring",
true,
stderr,
);
}
const lookupEntries = (lookupString: string) => {
const regexString = `${lookupString}\\s*\\[(.*)\\]`;
const regexp = new RegExp(regexString, "gm");
const matches = regexp.exec(stdout);
if (matches?.length === 2) {
return matches[1].split(",").map((m) => m.slice(1, -1));
}
throw new Error(`Could not find any entries for ${lookupString}`);
};
this.targetPath = join(this.projectRoot.fsPath, "target");
this.modelPaths = lookupEntries("Model paths").map((p) =>
join(this.projectRoot.fsPath, p),
);
this.seedPaths = lookupEntries("Seed paths").map((p) =>
join(this.projectRoot.fsPath, p),
);
this.macroPaths = lookupEntries("Macro paths").map((p) =>
join(this.projectRoot.fsPath, p),
);
this.packagesInstallPath = join(this.projectRoot.fsPath, "dbt_packages");
} catch (error) {
this.terminal.warn(
"DbtCloudIntegrationInitializePathsExceptionError",
"adapter type throws error, ignoring",
true,
error,
);
this.targetPath = join(this.projectRoot.fsPath, "target");
this.modelPaths = [join(this.projectRoot.fsPath, "models")];
this.seedPaths = [join(this.projectRoot.fsPath, "seeds")];
this.macroPaths = [join(this.projectRoot.fsPath, "macros")];
this.packagesInstallPath = join(this.projectRoot.fsPath, "dbt_packages");
}
}
private async findAdapterType() {
const adapterTypeCommand = this.dbtCloudCommand(
new DBTCommand("Getting adapter type...", [
"compile",
"--inline",
"{{ adapter.type() }}",
"--output",
"json",
"--log-format",
"json",
]),
);
try {
const { stdout, stderr } = await adapterTypeCommand.execute();
if (stderr) {
this.terminal.warn(
"DbtCloudIntegrationAdapterDetectionStdError",
"adapter type returns stderr, ignoring",
true,
stderr,
);
}
const compiledLine = stdout
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter((line) => line.data.hasOwnProperty("compiled"));
this.adapterType = compiledLine[0].data.compiled;
this.terminal.debug(
"dbtCloudIntegration",
`Set adapter type to ${this.adapterType}`,
);
} catch (error) {
this.terminal.warn(
"DbtCloudIntegrationAdapterDetectionExceptionError",
"adapter type throws error, ignoring",
true,
error,
);
}
}
private processJSONErrors(jsonErrors: string) {
if (!jsonErrors) {
return;
}
try {
const errorLines: string[] = [];
// eslint-disable-next-line prefer-spread
errorLines.push.apply(
errorLines,
jsonErrors
.trim()
.split("\n")
.map((line) => JSON.parse(line.trim()))
.filter(
(line) =>
line.info.level === "error" || line.info.level === "fatal",
)
.map((line) => line.info.msg),
);
if (errorLines.length) {
return new Error(errorLines.join(", "));
}
} catch (error) {
// ideally we never come here, this is a bug in our code
return new Error("Could not process " + jsonErrors + ": " + error);
}
}
private throwIfNotAuthenticated() {
this.validationProvider.throwIfNotAuthenticated();
}
async dispose() {
try {
await this.executionInfrastructure.closePythonBridge(this.python);
} catch (error) {} // We don't care about errors here.
this.rebuildManifestDiagnostics.clear();
this.pythonBridgeDiagnostics.clear();
while (this.disposables.length) {
const x = this.disposables.pop();
if (x) {
x.dispose();
}
}
}
findPackageVersion(_packageName: string) {
return undefined;
}
private throwBridgeErrorIfAvailable() {
const allDiagnostics: DiagnosticCollection[] = [
this.pythonBridgeDiagnostics,
this.rebuildManifestDiagnostics,
];
for (const diagnosticCollection of allDiagnostics) {
for (const [_, diagnostics] of diagnosticCollection) {
if (diagnostics.length > 0) {
const firstError = diagnostics[0];
throw new Error(firstError.message);
}
}
}
}
async performDatapilotHealthcheck({
manifestPath,
catalogPath,
config,
configPath,
}: HealthcheckArgs): Promise<ProjectHealthcheck> {
this.throwBridgeErrorIfAvailable();
const result = await this.python?.lock<ProjectHealthcheck>(
(python) =>
python!`to_dict(project_healthcheck(${manifestPath}, ${catalogPath}, ${configPath}, ${config}))`,
);
return result;
}
async applyDeferConfig(): Promise<void> {}
throwDiagnosticsErrorIfAvailable(): void {
this.throwBridgeErrorIfAvailable();
}

import { existsSync, readFileSync, writeFileSync } from "fs";
import * as path from "path";
import { PythonException } from "python-bridge";
import {
CancellationToken,
commands,
Diagnostic,
DiagnosticCollection,
Disposable,
Event,
EventEmitter,
languages,
Range,
RelativePattern,
Uri,
ViewColumn,
window,
workspace,
} from "vscode";
import { parse, YAMLError } from "yaml";
import { DBTTerminal } from "../dbt_client/dbtTerminal";
import {
debounce,
extendErrorWithSupportLinks,
getColumnNameByCase,
setupWatcherHandler,
} from "../utils";
import { QueryResultPanel } from "../webview_provider/queryResultPanel";
import {
ManifestCacheChangedEvent,
RebuildManifestStatusChange,
ManifestCacheProjectAddedEvent,
} from "./event/manifestCacheChangedEvent";
import { ProjectConfigChangedEvent } from "./event/projectConfigChangedEvent";
import { DBTProjectLog, DBTProjectLogFactory } from "./modules/dbtProjectLog";
import {
SourceFileWatchers,
SourceFileWatchersFactory,
} from "./modules/sourceFileWatchers";
import { TargetWatchersFactory } from "./modules/targetWatchers";
import { PythonEnvironment } from "./pythonEnvironment";
import { TelemetryService } from "../telemetry";
import * as crypto from "crypto";
import {
DBTProjectIntegration,
DBTCommandFactory,
RunModelParams,
Catalog,
DBTNode,
DBColumn,
SourceNode,
HealthcheckArgs,
} from "../dbt_client/dbtIntegration";
import { DBTCoreProjectIntegration } from "../dbt_client/dbtCoreIntegration";
import { DBTCloudProjectIntegration } from "../dbt_client/dbtCloudIntegration";
import { AltimateRequest, NoCredentialsError } from "../altimate";
import { ValidationProvider } from "../validation_provider";
import { ModelNode } from "../altimate";
import { ColumnMetaData } from "../domain";
import { AltimateConfigProps } from "../webview_provider/insightsPanel";
interface FileNameTemplateMap {
[key: string]: string;
}
export class DBTProject implements Disposable {
static DBT_PROJECT_FILE = "dbt_project.yml";
static MANIFEST_FILE = "manifest.json";
static CATALOG_FILE = "catalog.json";
static RESOURCE_TYPE_MODEL = "model";
static RESOURCE_TYPE_MACRO = "macro";
static RESOURCE_TYPE_ANALYSIS = "analysis";
static RESOURCE_TYPE_SOURCE = "source";
static RESOURCE_TYPE_EXPOSURE = "exposure";
static RESOURCE_TYPE_SEED = "seed";
static RESOURCE_TYPE_SNAPSHOT = "snapshot";
static RESOURCE_TYPE_TEST = "test";
static RESOURCE_TYPE_METRIC = "semantic_model";
readonly projectRoot: Uri;
private projectConfig: any; // TODO: typing
private dbtProjectIntegration: DBTProjectIntegration;
private _onProjectConfigChanged =
new EventEmitter<ProjectConfigChangedEvent>();
public onProjectConfigChanged = this._onProjectConfigChanged.event;
private sourceFileWatchers: SourceFileWatchers;
public onSourceFileChanged: Event<void>;
private dbtProjectLog?: DBTProjectLog;
private disposables: Disposable[] = [this._onProjectConfigChanged];
private readonly projectConfigDiagnostics =
languages.createDiagnosticCollection("dbt");
public readonly projectHealth = languages.createDiagnosticCollection("dbt");
private _onRebuildManifestStatusChange =
new EventEmitter<RebuildManifestStatusChange>();
readonly onRebuildManifestStatusChange =
this._onRebuildManifestStatusChange.event;
constructor(
private PythonEnvironment: PythonEnvironment,
private sourceFileWatchersFactory: SourceFileWatchersFactory,
private dbtProjectLogFactory: DBTProjectLogFactory,
private targetWatchersFactory: TargetWatchersFactory,
private dbtCommandFactory: DBTCommandFactory,
private terminal: DBTTerminal,
private queryResultPanel: QueryResultPanel,
private telemetry: TelemetryService,
private dbtCoreIntegrationFactory: (
path: Uri,
projectConfigDiagnostics: DiagnosticCollection,
) => DBTCoreProjectIntegration,
private dbtCloudIntegrationFactory: (
path: Uri,
) => DBTCloudProjectIntegration,
private altimate: AltimateRequest,
private validationProvider: ValidationProvider,
path: Uri,
projectConfig: any,
_onManifestChanged: EventEmitter<ManifestCacheChangedEvent>,
) {
this.projectRoot = path;
this.projectConfig = projectConfig;
this.validationProvider.validateCredentialsSilently();
this.sourceFileWatchers =
this.sourceFileWatchersFactory.createSourceFileWatchers(
this.onProjectConfigChanged,
);
this.onSourceFileChanged = this.sourceFileWatchers.onSourceFileChanged;
const dbtIntegrationMode = workspace
.getConfiguration("dbt")
.get<string>("dbtIntegration", "core");
switch (dbtIntegrationMode) {
case "cloud":
this.dbtProjectIntegration = this.dbtCloudIntegrationFactory(
this.projectRoot,
);
break;
default:
this.dbtProjectIntegration = this.dbtCoreIntegrationFactory(
this.projectRoot,
this.projectConfigDiagnostics,
);
break;
}
this.disposables.push(
this.dbtProjectIntegration,
this.targetWatchersFactory.createTargetWatchers(
_onManifestChanged,
this.onProjectConfigChanged,
),
this.PythonEnvironment.onPythonEnvironmentChanged(() =>
this.onPythonEnvironmentChanged(),
),
this.sourceFileWatchers,
this.projectConfigDiagnostics,
);
this.terminal.debug(
"DbtProject",
`Created ${dbtIntegrationMode} dbt project ${this.getProjectName()} at ${
this.projectRoot
}`,
);
}
public getProjectName() {
return this.projectConfig.name;
}
getDBTProjectFilePath() {
return path.join(this.projectRoot.fsPath, DBTProject.DBT_PROJECT_FILE);
}
getTargetPath() {
return this.dbtProjectIntegration.getTargetPath();
}
getPackageInstallPath() {
return this.dbtProjectIntegration.getPackageInstallPath();
}
getModelPaths() {
return this.dbtProjectIntegration.getModelPaths();
}
getSeedPaths() {
return this.dbtProjectIntegration.getSeedPaths();
}
getMacroPaths() {
return this.dbtProjectIntegration.getMacroPaths();
}
getManifestPath() {
const targetPath = this.getTargetPath();
if (!targetPath) {
return;
}
return path.join(targetPath, DBTProject.MANIFEST_FILE);
}
getCatalogPath() {
const targetPath = this.getTargetPath();
if (!targetPath) {
return;
}
return path.join(targetPath, DBTProject.CATALOG_FILE);
}
getPythonBridgeStatus() {
return this.dbtProjectIntegration.getPythonBridgeStatus();
}
getAllDiagnostic(): Diagnostic[] {
return this.dbtProjectIntegration.getAllDiagnostic();
}
async performDatapilotHealthcheck(args: AltimateConfigProps) {
const manifestPath = this.getManifestPath();
if (!manifestPath) {
throw new Error(
`Unable to find manifest path for project ${this.getProjectName()}`,
);
}
const healthcheckArgs: HealthcheckArgs = { manifestPath };
if (args.configType === "Manual") {
healthcheckArgs.configPath = args.configPath;
} else {
if (args.configType === "Saas") {
healthcheckArgs.config = args.config;
}
if (
args.configType === "All" ||
args.config_schema.some((i) => i.files_required.includes("Catalog"))
) {
const docsGenerateCommand =
this.dbtCommandFactory.createDocsGenerateCommand();
docsGenerateCommand.focus = false;
docsGenerateCommand.logToTerminal = false;
docsGenerateCommand.showProgress = false;
await this.dbtProjectIntegration.executeCommandImmediately(
docsGenerateCommand,
);
healthcheckArgs.catalogPath = this.getCatalogPath();
if (!healthcheckArgs.catalogPath) {
throw new Error(
`Unable to find catalog path for project ${this.getProjectName()}`,
);
}
}
}
this.terminal.debug(
"performDatapilotHealthcheck",
"Performing healthcheck",
healthcheckArgs,
);
const projectHealthcheck =
await this.dbtProjectIntegration.performDatapilotHealthcheck(
healthcheckArgs,
);
// temp fix: ideally datapilot should return absolute path
for (const key in projectHealthcheck.model_insights) {
for (const item of projectHealthcheck.model_insights[key]) {
item.path = path.join(this.projectRoot.fsPath, item.original_file_path);
}
}
return projectHealthcheck;
}
async initialize() {
// ensure we watch all files and reflect changes
// This is purely vscode watchers, no need for the project to be fully initialized
const dbtProjectConfigWatcher = workspace.createFileSystemWatcher(
new RelativePattern(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
);
setupWatcherHandler(dbtProjectConfigWatcher, async () => {
await this.refreshProjectConfig();
this.rebuildManifest();
});
await this.dbtProjectIntegration.initializeProject();
await this.refreshProjectConfig();
this.rebuildManifest();
this.dbtProjectLog = this.dbtProjectLogFactory.createDBTProjectLog(
this.onProjectConfigChanged,
);
// ensure all watchers are cleaned up
this.disposables.push(
this.dbtProjectLog,
dbtProjectConfigWatcher,
this.onSourceFileChanged(
debounce(async () => {
this.terminal.debug(
"DBTProject",
`SourceFileChanged event fired for "${this.getProjectName()}" at ${
this.projectRoot
}`,
);
await this.rebuildManifest();
}, this.dbtProjectIntegration.getDebounceForRebuildManifest()),
),
);
this.terminal.debug(
"DbtProject",
`Initialized dbt project ${this.getProjectName()} at ${this.projectRoot}`,
);
}
private async onPythonEnvironmentChanged() {
this.terminal.debug(
"DbtProject",
`Python environment for dbt project ${this.getProjectName()} at ${
this.projectRoot
} has changed`,
);
await this.initialize();
}
async refreshProjectConfig() {
this.terminal.debug(
"DBTProject",
`Going to refresh the project "${this.getProjectName()}" at ${
this.projectRoot
} configuration`,
);
try {
this.projectConfig = DBTProject.readAndParseProjectConfig(
this.projectRoot,
);
await this.dbtProjectIntegration.refreshProjectConfig();
this.projectConfigDiagnostics.clear();
} catch (error) {
if (error instanceof YAMLError) {
this.projectConfigDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[
new Diagnostic(
new Range(0, 0, 999, 999),
"dbt_project.yml is invalid : " + error.message,
),
],
);
} else if (error instanceof PythonException) {
this.projectConfigDiagnostics.set(
Uri.joinPath(this.projectRoot, DBTProject.DBT_PROJECT_FILE),
[
new Diagnostic(
new Range(0, 0, 999, 999),
"dbt configuration is invalid : " + error.exception.message,
),
],
);
}
this.terminal.debug(
"DBTProject",
`An error occurred while trying to refresh the project "${this.getProjectName()}" at ${
this.projectRoot
} configuration`,
error,
);
this.telemetry.sendTelemetryError("projectConfigRefreshError", error);
}
const event = new ProjectConfigChangedEvent(this);
this._onProjectConfigChanged.fire(event);
this.terminal.debug(
"DBTProject",
`firing ProjectConfigChanged event for the project "${this.getProjectName()}" at ${
this.projectRoot
} configuration`,
"targetPaths",
this.getTargetPath(),
"modelPaths",
this.getModelPaths(),
"seedPaths",
this.getSeedPaths(),
"macroPaths",
this.getMacroPaths(),
"packagesInstallPath",
this.getPackageInstallPath(),
"version",
this.getDBTVersion(),
"adapterType",
this.getAdapterType(),
);
}
getAdapterType() {
return this.dbtProjectIntegration.getAdapterType() || "unknown";
}
findPackageName(uri: Uri): string | undefined {
const documentPath = uri.path;
const pathSegments = documentPath
.replace(new RegExp(this.projectRoot.path + "/", "g"), "")
.split("/");
const packagesInstallPath = this.getPackageInstallPath();
if (packagesInstallPath && uri.fsPath.startsWith(packagesInstallPath)) {
return pathSegments[1];
}
return undefined;
}
contains(uri: Uri) {
return (
uri.fsPath === this.projectRoot.fsPath ||
uri.fsPath.startsWith(this.projectRoot.fsPath + path.sep)
);
}
private async rebuildManifest() {
this.terminal.debug(
"DBTProject",
`Going to rebuild the manifest for "${this.getProjectName()}" at ${
this.projectRoot
}`,
);
this._onRebuildManifestStatusChange.fire({
project: this,
inProgress: true,
});
await this.dbtProjectIntegration.rebuildManifest();
this._onRebuildManifestStatusChange.fire({
project: this,
inProgress: false,
});
this.terminal.debug(
"DBTProject",
`Finished rebuilding the manifest for "${this.getProjectName()}" at ${
this.projectRoot
}`,
);
}
async runModel(runModelParams: RunModelParams) {
try {
const runModelCommand =
this.dbtCommandFactory.createRunModelCommand(runModelParams);
await this.dbtProjectIntegration.runModel(runModelCommand);
this.telemetry.sendTelemetryEvent("runModel");
} catch (error) {
this.handleNoCredentialsError(error);
}
}
async buildModel(runModelParams: RunModelParams) {
try {
const buildModelCommand =
this.dbtCommandFactory.createBuildModelCommand(runModelParams);
await this.dbtProjectIntegration.buildModel(buildModelCommand);
this.telemetry.sendTelemetryEvent("buildModel");
} catch (error) {
this.handleNoCredentialsError(error);
}
}
async buildProject() {
try {
const buildProjectCommand =
this.dbtCommandFactory.createBuildProjectCommand();
await this.dbtProjectIntegration.buildProject(buildProjectCommand);
this.telemetry.sendTelemetryEvent("buildProject");
} catch (error) {
this.handleNoCredentialsError(error);
}
}
async runTest(testName: string) {
try {
const testModelCommand =
this.dbtCommandFactory.createTestModelCommand(testName);
await this.dbtProjectIntegration.runTest(testModelCommand);
this.telemetry.sendTelemetryEvent("runTest");
} catch (error) {
this.handleNoCredentialsError(error);
}
}
async runModelTest(modelName: string) {
try {
const testModelCommand =
this.dbtCommandFactory.createTestModelCommand(modelName);
this.dbtProjectIntegration.runModelTest(testModelCommand);
await this.telemetry.sendTelemetryEvent("runModelTest");
} catch (error) {
this.handleNoCredentialsError(error);
}
}
private handleNoCredentialsError(error: unknown) {
if (error instanceof NoCredentialsError) {
this.altimate.handlePreviewFeatures();
return;
}
window.showErrorMessage((error as Error).message);
}
compileModel(runModelParams: RunModelParams) {
const compileModelCommand =
this.dbtCommandFactory.createCompileModelCommand(runModelParams);
this.dbtProjectIntegration.compileModel(compileModelCommand);
this.telemetry.sendTelemetryEvent("compileModel");
}
async generateDocsImmediately(args?: string[]) {
const docsGenerateCommand =
this.dbtCommandFactory.createDocsGenerateCommand();
args?.forEach((arg) => docsGenerateCommand.addArgument(arg));
docsGenerateCommand.focus = false;
docsGenerateCommand.logToTerminal = false;
await this.dbtProjectIntegration.executeCommandImmediately(
docsGenerateCommand,
);
this.telemetry.sendTelemetryEvent("generateDocsImmediately");
}
generateDocs() {
const docsGenerateCommand =
this.dbtCommandFactory.createDocsGenerateCommand();
this.dbtProjectIntegration.generateDocs(docsGenerateCommand);
this.telemetry.sendTelemetryEvent("generateDocs");
}
debug() {
const debugCommand = this.dbtCommandFactory.createDebugCommand();
this.telemetry.sendTelemetryEvent("debug");
return this.dbtProjectIntegration.debug(debugCommand);
}
installDeps() {
this.telemetry.sendTelemetryEvent("installDeps");
const installDepsCommand =
this.dbtCommandFactory.createInstallDepsCommand();
return this.dbtProjectIntegration.deps(installDepsCommand);
}
async compileNode(modelName: string): Promise<string | undefined> {
this.telemetry.sendTelemetryEvent("compileNode");
try {
return await this.dbtProjectIntegration.unsafeCompileNode(modelName);
} catch (exc: any) {
if (exc instanceof PythonException) {
window.showErrorMessage(
extendErrorWithSupportLinks(
`An error occured while trying to compile your node: ${modelName}` +
exc.exception.message +
".",
),
);
this.telemetry.sendTelemetryError("compileNodePythonError", exc);
return (
"Exception: " +
exc.exception.message +
"\n\n" +
"Detailed error information:\n" +
exc
);
}
this.telemetry.sendTelemetryError("compileNodeUnknownError", exc);
// Unknown error
window.showErrorMessage(
extendErrorWithSupportLinks(
"Could not compile model " +
modelName +
": " +
(exc as Error).message +
".",
),
);
return "Detailed error information:\n" + exc;
}
}
async unsafeCompileNode(modelName: string): Promise<string | undefined> {
this.telemetry.sendTelemetryEvent("unsafeCompileNode");
return await this.dbtProjectIntegration.unsafeCompileNode(modelName);
}
async validateSql(request: { sql: string; dialect: string; models: any[] }) {
try {
const { sql, dialect, models } = request;
return this.dbtProjectIntegration.validateSql(sql, dialect, models);
} catch (exc) {
window.showErrorMessage(
extendErrorWithSupportLinks("Could not validate sql." + exc),
);
this.telemetry.sendTelemetryError("validateSQLError", {
error: exc,
});
}
}
async validateSQLDryRun(query: string) {
try {
return this.dbtProjectIntegration.validateSQLDryRun(query);
} catch (exc) {
const exception = exc as { exception: { message: string } };
window.showErrorMessage(
exception.exception.message || "Could not validate sql with dry run.",
);
this.telemetry.sendTelemetryError("validateSQLDryRunError", {
error: exc,
});
}
}
getDBTVersion(): number[] | undefined {
// TODO: do this when config or python env changes and cache value
try {
return this.dbtProjectIntegration.getVersion();
} catch (exc) {
window.showErrorMessage(
extendErrorWithSupportLinks("Could not get dbt version." + exc),
);
this.telemetry.sendTelemetryError("getDBTVersionError", { error: exc });
}
}
async compileQuery(query: string): Promise<string | undefined> {
this.telemetry.sendTelemetryEvent("compileQuery");
try {
return await this.dbtProjectIntegration.unsafeCompileQuery(query);
} catch (exc: any) {
if (exc instanceof PythonException) {
window.showErrorMessage(
extendErrorWithSupportLinks(
"An error occured while trying to compile your query: " +
exc.exception.message +
".",
),
);
this.telemetry.sendTelemetryError("compileQueryPythonError", exc);
return undefined;
}
this.telemetry.sendTelemetryError("compileQueryUnknownError", exc);
// Unknown error
window.showErrorMessage(
extendErrorWithSupportLinks(
"Could not compile query: " + (exc as Error).message,
),
);
return undefined;
}
}
showCompiledSql(modelPath: Uri) {
this.findModelInTargetfolder(modelPath, "compiled");
}
showRunSQL(modelPath: Uri) {
this.findModelInTargetfolder(modelPath, "run");
}
createYMLContent(
columnsInRelation: { [key: string]: string }[],
modelName: string,
): string {
let yamlString = "version: 2\n\nmodels:\n";
yamlString += ` - name: ${modelName}\n description: ""\n columns:\n`;
for (const item of columnsInRelation) {
yamlString += ` - name: ${item.column}\n description: ""\n`;
}
return yamlString;
}
async unsafeCompileQuery(query: string) {
return this.dbtProjectIntegration.unsafeCompileQuery(query);
}
async getColumnsOfModel(modelName: string) {
return this.dbtProjectIntegration.getColumnsOfModel(modelName);
}
async getColumnsOfSource(sourceName: string, tableName: string) {
return this.dbtProjectIntegration.getColumnsOfSource(sourceName, tableName);
}
async getColumnValues(model: string, column: string) {
this.terminal.debug(
"getColumnValues",
"finding distinct values for column",
true,
{ model, column },
);
const query = `select ${column} from {{ ref('${model}')}} group by ${column}`;
const queryExecution = await this.dbtProjectIntegration.executeSQL(
query,
100, // setting this 100 as executeSql needs a limit and distinct values will be usually less in number
);
const result = await queryExecution.executeQuery();
return result.table.rows.flat();
}
async getBulkSchema(req: DBTNode[], cancellationToken: CancellationToken) {
return this.dbtProjectIntegration.getBulkSchema(req, cancellationToken);
}
async getCatalog(): Promise<Catalog> {
try {
return this.dbtProjectIntegration.getCatalog();
} catch (exc: any) {
if (exc instanceof PythonException) {
this.telemetry.sendTelemetryError("catalogPythonError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
"Some of the scans could not run as connectivity to database for the project " +
this.getProjectName() +
" is not available. ",
);
return [];
}
// Unknown error
this.telemetry.sendTelemetryError("catalogUnknownError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
"Some of the scans could not run as connectivity to database for the project " +
this.getProjectName() +
" is not available. ",
);
return [];
}
}
async generateSchemaYML(modelPath: Uri, modelName: string) {
try {
// Create filePath based on model location
const currentDir = path.dirname(modelPath.fsPath);
const location = path.join(currentDir, modelName + "_schema.yml");
if (!existsSync(location)) {
this.telemetry.sendTelemetryEvent("generateSchemaYML", {
adapter: this.getAdapterType(),
});
const columnsInRelation = await this.getColumnsOfModel(modelName);
// Generate yml file content
const fileContents = this.createYMLContent(
columnsInRelation,
modelName,
);
writeFileSync(location, fileContents);
const doc = await workspace.openTextDocument(Uri.file(location));
window.showTextDocument(doc);
} else {
window.showErrorMessage(
`A file called ${modelName}_schema.yml already exists in ${currentDir}. If you want to generate the schema yml, please rename the other file or delete it if you want to generate the yml again.`,
);
}
} catch (exc: any) {
if (exc instanceof PythonException) {
this.telemetry.sendTelemetryError("generateSchemaYMLPythonError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
extendErrorWithSupportLinks(
"An error occured while trying to generate the schema yml " +
exc.exception.message +
".",
),
);
}
// Unknown error
this.telemetry.sendTelemetryError("generateSchemaYMLUnknownError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
extendErrorWithSupportLinks(
"Could not generate schema yaml: " + (exc as Error).message,
),
);
}
}
async generateModel(
sourceName: string,
tableName: string,
sourcePath: string,
) {
try {
const prefix = workspace
.getConfiguration("dbt")
.get<string>("prefixGenerateModel", "base");
// Map setting to fileName
const fileNameTemplateMap: FileNameTemplateMap = {
"{prefix}_{sourceName}_{tableName}": `${prefix}_${sourceName}_${tableName}`,
"{prefix}_{sourceName}__{tableName}": `${prefix}_${sourceName}__${tableName}`,
"{prefix}_{tableName}": `${prefix}_${tableName}`,
"{tableName}": `${tableName}`,
};
// Default filename template
let fileName = `${prefix}_${sourceName}_${tableName}`;
const fileNameTemplate = workspace
.getConfiguration("dbt")
.get<string>(
"fileNameTemplateGenerateModel",
"{prefix}_{sourceName}_{tableName}",
);
this.telemetry.sendTelemetryEvent("generateModel", {
prefix: prefix,
filenametemplate: fileNameTemplate,
adapter: this.getAdapterType(),
});
// Parse setting to fileName
if (fileNameTemplate in fileNameTemplateMap) {
fileName = fileNameTemplateMap[fileNameTemplate];
}
// Create filePath based on source.yml location
const location = path.join(sourcePath, fileName + ".sql");
if (!existsSync(location)) {
const columnsInRelation = await this.getColumnsOfSource(
sourceName,
tableName,
);
this.terminal.debug(
"dbtProject:generateModel",
`Generating columns for source ${sourceName} and table ${tableName}`,
columnsInRelation,
);
const fileContents = `with source as (
select * from {{ source('${sourceName}', '${tableName}') }}
),
renamed as (
select
${columnsInRelation
.map((column) => `{{ adapter.quote("${column.column}") }}`)
.join(",\n ")}
from source
)
select * from renamed
`;
writeFileSync(location, fileContents);
const doc = await workspace.openTextDocument(Uri.file(location));
window.showTextDocument(doc);
} else {
window.showErrorMessage(
`A model called ${fileName} already exists in ${sourcePath}. If you want to generate the model, please rename the other model or delete it if you want to generate the model again.`,
);
}
} catch (exc: any) {
if (exc instanceof PythonException) {
this.telemetry.sendTelemetryError("generateModelPythonError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
"An error occured while trying to generate the model " +
exc.exception.message,
);
}
// Unknown error
this.telemetry.sendTelemetryError("generateModelUnknownError", exc, {
adapter: this.getAdapterType(),
});
window.showErrorMessage(
extendErrorWithSupportLinks(
"An error occured while trying to generate the model:" + exc + ".",
),
);
}
}
async executeSQL(query: string) {
const limit = workspace
.getConfiguration("dbt")
.get<number>("queryLimit", 500);
if (limit <= 0) {
window.showErrorMessage("Please enter a positive number for query limit");
return;
}
this.telemetry.sendTelemetryEvent("executeSQL", {
adapter: this.getAdapterType(),
limit: limit.toString(),
});
this.terminal.debug("executeSQL", query, {
adapter: this.getAdapterType(),
limit: limit.toString(),
});
// TODO: this should generate an event instead of directly going to the panel
this.queryResultPanel.executeQuery(
query,
this.dbtProjectIntegration.executeSQL(query, limit),
);
}
async dispose() {
while (this.disposables.length) {
const x = this.disposables.pop();
if (x) {
x.dispose();
}
}
}
static readAndParseProjectConfig(projectRoot: Uri) {
const dbtProjectConfigLocation = path.join(
projectRoot.fsPath,
DBTProject.DBT_PROJECT_FILE,
);
const dbtProjectYamlFile = readFileSync(dbtProjectConfigLocation, "utf8");
return parse(dbtProjectYamlFile, {
strict: false,
uniqueKeys: false,
maxAliasCount: -1,
});
}
static hashProjectRoot(projectRoot: string) {
return crypto.createHash("md5").update(projectRoot).digest("hex");
}
private async findModelInTargetfolder(modelPath: Uri, type: string) {
const targetPath = this.getTargetPath();
if (!targetPath) {
return;
}
const baseName = path.basename(modelPath.fsPath);
const targetModels = await workspace.findFiles(
new RelativePattern(targetPath, `${type}/**/${baseName}`),
);
if (targetModels.length > 0) {
commands.executeCommand("vscode.open", targetModels[0], {
preview: false,
preserveFocus: true,
viewColumn: ViewColumn.Beside,
});
}
}
static isResourceNode(resource_type: string): boolean {
return (
resource_type === DBTProject.RESOURCE_TYPE_MODEL ||
resource_type === DBTProject.RESOURCE_TYPE_SEED ||
resource_type === DBTProject.RESOURCE_TYPE_ANALYSIS ||
resource_type === DBTProject.RESOURCE_TYPE_SNAPSHOT
);
}
static isResourceHasDbColumns(resource_type: string): boolean {
return (
resource_type === DBTProject.RESOURCE_TYPE_MODEL ||
resource_type === DBTProject.RESOURCE_TYPE_SEED ||
resource_type === DBTProject.RESOURCE_TYPE_SNAPSHOT
);
}
static getNonEphemeralParents(
event: ManifestCacheProjectAddedEvent,
keys: string[],
): string[] {
const { nodeMetaMap, graphMetaMap } = event;
const { parents } = graphMetaMap;
const parentSet = new Set<string>();
const queue = keys;
const visited: Record<string, boolean> = {};
while (queue.length > 0) {
const curr = queue.shift()!;
if (visited[curr]) {
continue;
}
visited[curr] = true;
const parent = parents.get(curr);
if (!parent) {
continue;
}
for (const n of parent.nodes) {
const splits = n.key.split(".");
const resource_type = splits[0];
if (resource_type !== DBTProject.RESOURCE_TYPE_MODEL) {
parentSet.add(n.key);
continue;
}
if (nodeMetaMap.get(splits[2])?.config.materialized === "ephemeral") {
queue.push(n.key);
} else {
parentSet.add(n.key);
}
}
}
return Array.from(parentSet);
}
mergeColumnsFromDB(
node: Pick<ModelNode, "columns">,
columnsFromDB: DBColumn[],
) {
if (!columnsFromDB || columnsFromDB.length === 0) {
return false;
}
if (columnsFromDB.length > 100) {
// Flagging events where more than 100 columns are fetched from db to get a sense of how many of these happen
this.telemetry.sendTelemetryEvent("excessiveColumnsFetchedFromDB");
}
const columnsFromManifest: Record<string, ColumnMetaData> = {};
Object.entries(node.columns).forEach(([k, v]) => {
columnsFromManifest[getColumnNameByCase(k, this.getAdapterType())] = v;
});
for (const c of columnsFromDB) {
const columnNameFromDB = getColumnNameByCase(
c.column,
this.getAdapterType(),
);
const existing_column = columnsFromManifest[columnNameFromDB];
if (existing_column) {
existing_column.data_type = (
existing_column.data_type || c.dtype
)?.toLowerCase();
continue;
}
node.columns[columnNameFromDB] = {
name: columnNameFromDB,
data_type: c.dtype?.toLowerCase(),
description: "",
};
}
if (Object.keys(node.columns).length > columnsFromDB.length) {
// Flagging events where columns fetched from db are less than the number of columns in the manifest
this.telemetry.sendTelemetryEvent("possibleStaleSchema");
}
return true;
}
public findPackageVersion(packageName: string) {
const version = this.dbtProjectIntegration.findPackageVersion(packageName);
this.terminal.debug(
"dbtProject:findPackageVersion",
`found ${packageName} version: ${version}`,
);
return version;
}
async getNodesWithDBColumns(
event: ManifestCacheProjectAddedEvent,
modelsToFetch: string[],
cancellationToken: CancellationToken,
) {
const { nodeMetaMap, sourceMetaMap } = event;
const mappedNode: Record<string, ModelNode> = {};
const bulkSchemaRequest: DBTNode[] = [];
const relationsWithoutColumns: string[] = [];
for (const key of modelsToFetch) {
const splits = key.split(".");
const resource_type = splits[0];
if (resource_type === DBTProject.RESOURCE_TYPE_SOURCE) {
const source = sourceMetaMap.get(splits[2]);
const tableName = splits[3];
if (!source) {
continue;
}
const table = source?.tables.find((t) => t.name === tableName);
if (!table) {
continue;
}
bulkSchemaRequest.push({
unique_id: key,
name: source.name,
resource_type,
table: table.name,
} as SourceNode);
const node = {
database: source.database,
schema: source.schema,
name: table.name,
alias: table.identifier,
uniqueId: key,
columns: table.columns,
};
mappedNode[key] = node;
} else if (DBTProject.isResourceNode(resource_type)) {
const node = nodeMetaMap.get(splits[2]);
if (!node) {
continue;
}
if (DBTProject.isResourceHasDbColumns(resource_type)) {
bulkSchemaRequest.push({
unique_id: key,
name: node.name,
resource_type,
});
}
mappedNode[key] = node;
}
}
const bulkSchemaResponse = await this.getBulkSchema(
bulkSchemaRequest,
cancellationToken,
);
for (const key of modelsToFetch) {
const node = mappedNode[key];
if (!node) {
continue;
}
const dbColumnAdded = this.mergeColumnsFromDB(
node,
bulkSchemaResponse[key],
);
if (!dbColumnAdded) {
relationsWithoutColumns.push(key);
}
}
return { mappedNode, relationsWithoutColumns };
}
async applyDeferConfig(): Promise<void> {
await this.dbtProjectIntegration.applyDeferConfig();
}
throwDiagnosticsErrorIfAvailable() {
this.dbtProjectIntegration.throwDiagnosticsErrorIfAvailable();
}

import { inject } from "inversify";
import { basename } from "path";
import {
commands,
Disposable,
EventEmitter,
ExtensionContext,
Uri,
window,
workspace,
WorkspaceFolder,
} from "vscode";
import { DBTClient } from "../dbt_client";
import { EnvironmentVariables, RunModelType } from "../domain";
import { provideSingleton } from "../utils";
import { DBTProject } from "./dbtProject";
import { DBTWorkspaceFolder } from "./dbtWorkspaceFolder";
import {
ManifestCacheChangedEvent,
RebuildManifestCombinedStatusChange,
} from "./event/manifestCacheChangedEvent";
import { DBTTerminal } from "../dbt_client/dbtTerminal";
import { AltimateConfigProps } from "../webview_provider/insightsPanel";
import { AltimateDatapilot } from "../dbt_client/datapilot";
import { AltimateRequest } from "../altimate";
enum PromptAnswer {
YES = "Yes",
IGNORE = "Ignore",
}
export interface ProjectRegisteredUnregisteredEvent {
root: Uri;
name: string;
registered: boolean;
}
export interface DBTProjectsInitializationEvent {}
@provideSingleton(DBTProjectContainer)
export class DBTProjectContainer implements Disposable {
public onDBTInstallationVerification =
this.dbtClient.onDBTInstallationVerification;
private _onDBTProjectsInitializationEvent =
new EventEmitter<DBTProjectsInitializationEvent>();
public readonly onDBTProjectsInitialization =
this._onDBTProjectsInitializationEvent.event;
private dbtWorkspaceFolders: DBTWorkspaceFolder[] = [];
private _onManifestChanged = new EventEmitter<ManifestCacheChangedEvent>();
private _onProjectRegisteredUnregistered =
new EventEmitter<ProjectRegisteredUnregisteredEvent>();
public readonly onManifestChanged = this._onManifestChanged.event;
private disposables: Disposable[] = [
this._onManifestChanged,
this._onProjectRegisteredUnregistered,
];
private context?: ExtensionContext;
private projects: Map<Uri, string> = new Map<Uri, string>();
private _onRebuildManifestStatusChange =
new EventEmitter<RebuildManifestCombinedStatusChange>();
readonly onRebuildManifestStatusChange =
this._onRebuildManifestStatusChange.event;
private rebuildManifestStatusChangeMap = new Map<string, boolean>();
constructor(
private dbtClient: DBTClient,
@inject("Factory<DBTWorkspaceFolder>")
private dbtWorkspaceFolderFactory: (
workspaceFolder: WorkspaceFolder,
_onManifestChanged: EventEmitter<ManifestCacheChangedEvent>,
_onProjectRegisteredUnregistered: EventEmitter<ProjectRegisteredUnregisteredEvent>,
pythonPath?: string,
envVars?: EnvironmentVariables,
) => DBTWorkspaceFolder,
private dbtTerminal: DBTTerminal,
private altimateDatapilot: AltimateDatapilot,
private altimate: AltimateRequest,
) {
this.disposables.push(
workspace.onDidChangeWorkspaceFolders(async (event) => {
const { added, removed } = event;
await Promise.all(
added.map(
async (folder) => await this.registerWorkspaceFolder(folder),
),
);
removed.forEach((removedWorkspaceFolder) =>
this.unregisterWorkspaceFolder(removedWorkspaceFolder),
);
}),
);
this._onProjectRegisteredUnregistered.event((event) => {
if (event.registered) {
this.projects.set(event.root, event.name);
} else {
this.projects.delete(event.root);
}
const projects = Array.from(this.projects.entries());
commands.executeCommand(
"setContext",
"dbtPowerUser.projectCount",
projects.length,
);
if (projects.length === 1) {
this.setToWorkspaceState("dbtPowerUser.projectSelected", {
label: projects[0][1],
description: projects[0][0].fsPath,
uri: projects[0][0],
});
// For some reason we can't use dbtPowerUser.projectSelected to control the steps
commands.executeCommand(
"setContext",
"dbtPowerUser.walkthroughProjectSelected",
true,
);
} else {
// reset the experience so the user can reselect another project when running again
this.setToWorkspaceState("dbtPowerUser.projectSelected", null);
commands.executeCommand(
"setContext",
"dbtPowerUser.walkthroughProjectSelected",
false,
);
}
});
}
setContext(context: ExtensionContext) {
this.context = context;
}
showErrorIfDbtOrPythonNotInstalled() {
return this.dbtClient.showErrorIfDbtOrPythonNotInstalled();
}
showErrorIfDbtIsNotInstalled() {
return this.dbtClient.showErrorIfDbtIsNotInstalled();
}
async initializeDBTProjects(): Promise<void> {
const folders = workspace.workspaceFolders;
if (folders === undefined) {
return;
}
await Promise.all(
folders.map((folder) => this.registerWorkspaceFolder(folder)),
);
this._onDBTProjectsInitializationEvent.fire({});
}
async showWalkthrough() {
const answer = await window.showInformationMessage(
`Thanks for installing dbt Power User. Do you need help setting up the extension?`,
PromptAnswer.YES,
PromptAnswer.IGNORE,
);
commands.executeCommand(
"setContext",
"dbtPowerUser.showSetupWalkthrough",
false,
);
if (answer === PromptAnswer.YES) {
commands.executeCommand("dbtPowerUser.openSetupWalkthrough");
}
this.setToGlobalState("showSetupWalkthrough", false);
}
async initializeWalkthrough() {
// show setup walkthrough if needed
const isWalkthroughDisabledFromSettings = workspace
.getConfiguration("dbt")
.get("hideWalkthrough", false);
const showSetupWalkthrough = this.getFromGlobalState(
"showSetupWalkthrough",
);
if (
!isWalkthroughDisabledFromSettings &&
(showSetupWalkthrough === undefined || showSetupWalkthrough === true)
) {
this.dbtTerminal.debug(
"dbtProjectContainer:setupWalkthroughDisplayed",
"showing SetupWalkthrough: value of showSetupWalkthrough is" +
showSetupWalkthrough,
);
this.showWalkthrough();
}
const allProjects = await this.getProjects();
this.dbtTerminal.debug(
"dbtProjectContainer:initializeWalkthrough",
"getProjects",
allProjects,
);
commands.executeCommand(
"setContext",
"dbtPowerUser.projectCount",
allProjects.length,
);
const existingAssociations = workspace
.getConfiguration("files")
.get<any>("associations", {});
this.dbtTerminal.debug(
"dbtProjectContainer:fileAssociationsCheck",
"already existing fileAssociations",
existingAssociations,
);
let showFileAssociationsStep = false;
Object.entries({
"*.sql": ["jinja-sql", "sql"],
"*.yml": ["jinja-yaml", "yaml"],
}).forEach(([key, value]) => {
if (existingAssociations[key] === undefined) {
showFileAssociationsStep ||= true;
}
showFileAssociationsStep ||= !value.includes(existingAssociations[key]);
});
commands.executeCommand(
"setContext",
"dbtPowerUser.showFileAssociationStep",
showFileAssociationsStep,
);
}
get extensionUri() {
return this.context!.extensionUri;
}
get extensionVersion() {
return this.context!.extension.packageJSON.version;
}
setToWorkspaceState(key: string, value: any) {
this.context!.workspaceState.update(key, value);
}
getFromWorkspaceState(key: string): any {
return this.context!.workspaceState.get(key);
}
setToGlobalState(key: string, value: any) {
this.context!.globalState.update(key, value);
}
getFromGlobalState(key: string): any {
return this.context!.globalState.get(key);
}
get extensionId(): string {
return this.context?.extension.id.toString() || "";
}
// TODO: bypasses events and could be inconsistent
getPackageName = (uri: Uri): string | undefined => {
return this.findDBTProject(uri)?.findPackageName(uri);
};
// TODO: bypasses events and could be inconsistent
getProjectRootpath = (uri: Uri): Uri | undefined => {
return this.findDBTProject(uri)?.projectRoot;
};
async detectDBT(): Promise<void> {
await this.dbtClient.detectDBT();
}
async initialize() {
this.getProjects().forEach((project) => project.initialize());
}
executeSQL(uri: Uri, query: string): void {
this.findDBTProject(uri)?.executeSQL(query);
}
runModel(modelPath: Uri, type?: RunModelType) {
this.findDBTProject(modelPath)?.runModel(
this.createModelParams(modelPath, type),
);
}
buildModel(modelPath: Uri, type?: RunModelType) {
this.findDBTProject(modelPath)?.buildModel(
this.createModelParams(modelPath, type),
);
}
buildProject(modelPath: Uri, type?: RunModelType) {
this.findDBTProject(modelPath)?.buildProject();
}
runTest(modelPath: Uri, testName: string) {
this.findDBTProject(modelPath)?.runTest(testName);
}
runModelTest(modelPath: Uri, modelName: string) {
this.findDBTProject(modelPath)?.runModelTest(modelName);
}
compileModel(modelPath: Uri, type?: RunModelType) {
this.findDBTProject(modelPath)?.compileModel(
this.createModelParams(modelPath, type),
);
}
generateDocs(modelPath: Uri) {
this.findDBTProject(modelPath)?.generateDocs();
}
compileQuery(modelPath: Uri, query: string) {
this.findDBTProject(modelPath)?.compileQuery(query);
}
showRunSQL(modelPath: Uri) {
this.findDBTProject(modelPath)?.showRunSQL(modelPath);
}
showCompiledSQL(modelPath: Uri) {
this.findDBTProject(modelPath)?.showCompiledSql(modelPath);
}
generateSchemaYML(modelPath: Uri, modelName: string) {
this.findDBTProject(modelPath)?.generateSchemaYML(modelPath, modelName);
}
findDBTProject(uri: Uri): DBTProject | undefined {
return this.findDBTWorkspaceFolder(uri)?.findDBTProject(uri);
}
getProjects(): DBTProject[] {
return this.dbtWorkspaceFolders.flatMap((workspaceFolder) =>
workspaceFolder.getProjects(),
);
}
getAdapters(): string[] {
return Array.from(
new Set<string>(
this.dbtWorkspaceFolders.flatMap((workspaceFolder) =>
workspaceFolder.getAdapters(),
),
),
);
}
getPythonEnvironment() {
return this.dbtClient.getPythonEnvironment();
}
dispose() {
this.dbtWorkspaceFolders.forEach((workspaceFolder) =>
workspaceFolder.dispose(),
);
while (this.disposables.length) {
const x = this.disposables.pop();
if (x) {
x.dispose();
}
}
}
private createModelParams(modelPath: Uri, type?: RunModelType) {
const modelName = basename(modelPath.fsPath, ".sql");
const plusOperatorLeft =
type === RunModelType.RUN_PARENTS ||
type === RunModelType.BUILD_PARENTS ||
type === RunModelType.BUILD_CHILDREN_PARENTS
? "+"
: "";
const plusOperatorRight =
type === RunModelType.RUN_CHILDREN ||
type === RunModelType.BUILD_CHILDREN ||
type === RunModelType.BUILD_CHILDREN_PARENTS
? "+"
: "";
return { plusOperatorLeft, modelName, plusOperatorRight };
}
private async registerWorkspaceFolder(
workspaceFolder: WorkspaceFolder,
): Promise<void> {
const dbtProjectWorkspaceFolder = this.dbtWorkspaceFolderFactory(
workspaceFolder,
this._onManifestChanged,
this._onProjectRegisteredUnregistered,
);
this.disposables.push(
dbtProjectWorkspaceFolder.onRebuildManifestStatusChange((e) => {
this.rebuildManifestStatusChangeMap.set(
e.project.projectRoot.fsPath,
e.inProgress,
);
const inProgressProjects: DBTProject[] = Array.from(
this.rebuildManifestStatusChangeMap.entries(),
)
.filter(([_, inProgress]) => inProgress)
.map(([root, _]) => root)
.map((root) => this.findDBTProject(Uri.file(root)))
.filter((project) => project !== undefined) as DBTProject[];
this._onRebuildManifestStatusChange.fire({
projects: inProgressProjects,
inProgress: inProgressProjects.length > 0,
});
}),
);
this.dbtWorkspaceFolders.push(dbtProjectWorkspaceFolder);
this.dbtTerminal.debug(
"dbtProjectContainer:registerWorkspaceFolder",
"dbtWorkspaceFolders",
this.dbtWorkspaceFolders,
);
await dbtProjectWorkspaceFolder.discoverProjects();
}
private unregisterWorkspaceFolder(workspaceFolder: WorkspaceFolder): void {
const folderToDelete = this.findDBTWorkspaceFolder(workspaceFolder.uri);
if (folderToDelete === undefined) {
return;
}
this.dbtWorkspaceFolders.splice(
this.dbtWorkspaceFolders.indexOf(folderToDelete),
);
folderToDelete.dispose();
}
private findDBTWorkspaceFolder(uri: Uri): DBTWorkspaceFolder | undefined {
return this.dbtWorkspaceFolders.find((folder) => folder.contains(uri));
}
async checkIfAltimateDatapilotInstalled() {
const datapilotVersion =
await this.altimateDatapilot.checkIfAltimateDatapilotInstalled();
const { altimate_datapilot_version } =
await this.altimate.getDatapilotVersion(this.extensionVersion);
return datapilotVersion === altimate_datapilot_version;
}
async installAltimateDatapilot() {
const { altimate_datapilot_version } =
await this.altimate.getDatapilotVersion(this.extensionVersion);
await this.altimateDatapilot.installAltimateDatapilot(
altimate_datapilot_version,
);
}
executeAltimateDatapilotHealthcheck(args: AltimateConfigProps) {
const project = this.getProjects().find(
(p) => p.projectRoot.fsPath.toString() === args.projectRoot,
);
if (!project) {
throw new Error(`Unable to find project ${args.projectRoot}`);
}
return project.performDatapilotHealthcheck(args);
}

import {
CancellationToken,
Diagnostic,
Disposable,
ProgressLocation,
Uri,
window,
workspace,
} from "vscode";
import {
extendErrorWithSupportLinks,
getFirstWorkspacePath,
provideSingleton,
} from "../utils";
import { PythonBridge, pythonBridge } from "python-bridge";
import { provide } from "inversify-binding-decorators";
import {
CommandProcessExecution,
CommandProcessExecutionFactory,
CommandProcessResult,
} from "../commandProcessExecution";
import { PythonEnvironment } from "../manifest/pythonEnvironment";
import { existsSync } from "fs";
import { TelemetryService } from "../telemetry";
import { DBTTerminal } from "./dbtTerminal";
import {
AltimateRequest,
NoCredentialsError,
ValidateSqlParseErrorResponse,
} from "../altimate";
import { ProjectHealthcheck } from "./dbtCoreIntegration";
interface DBTCommandExecution {
command: (token?: CancellationToken) => Promise<void>;
statusMessage: string;
showProgress?: boolean;
focus?: boolean;
token?: CancellationToken;
}
export interface DBTCommandExecutionStrategy {
execute(
command: DBTCommand,
token?: CancellationToken,
): Promise<CommandProcessResult>;
}
@provideSingleton(CLIDBTCommandExecutionStrategy)
export class CLIDBTCommandExecutionStrategy
implements DBTCommandExecutionStrategy
{
constructor(
protected commandProcessExecutionFactory: CommandProcessExecutionFactory,
protected pythonEnvironment: PythonEnvironment,
protected terminal: DBTTerminal,
protected telemetry: TelemetryService,
protected cwd: Uri,
protected dbtPath: string,
) {}
async execute(
command: DBTCommand,
token?: CancellationToken,
): Promise<CommandProcessResult> {
const commandExecution = this.executeCommand(command, token);
const executionPromise = command.logToTerminal
? (await commandExecution).completeWithTerminalOutput()
: (await commandExecution).complete();
return executionPromise;
}
protected async executeCommand(
command: DBTCommand,
token?: CancellationToken,
): Promise<CommandProcessExecution> {
if (command.logToTerminal && command.focus) {
await this.terminal.show(true);
}
this.telemetry.sendTelemetryEvent("dbtCommand", {
command: command.getCommandAsString(),
});
if (command.logToTerminal) {
this.terminal.log(
`> Executing task: ${command.getCommandAsString()}\n\r`,
);
}
const { args } = command!;
if (
!this.pythonEnvironment.pythonPath ||
!this.pythonEnvironment.environmentVariables
) {
throw Error(
"Could not launch command as python environment is not available",
);
}
const tokens: CancellationToken[] = [];
if (token !== undefined) {
tokens.push(token);
}
if (command.token !== undefined) {
tokens.push(command.token);
}
return this.commandProcessExecutionFactory.createCommandProcessExecution({
command: this.dbtPath,
args,
tokens,
cwd: this.cwd.fsPath,
envVars: this.pythonEnvironment.environmentVariables,
});
}
}
@provideSingleton(PythonDBTCommandExecutionStrategy)
export class PythonDBTCommandExecutionStrategy
implements DBTCommandExecutionStrategy
{
constructor(
private commandProcessExecutionFactory: CommandProcessExecutionFactory,
private pythonEnvironment: PythonEnvironment,
private terminal: DBTTerminal,
private telemetry: TelemetryService,
) {}
async execute(
command: DBTCommand,
token?: CancellationToken,
): Promise<CommandProcessResult> {
return (
await this.executeCommand(command, token)
).completeWithTerminalOutput();
}
private async executeCommand(
command: DBTCommand,
token?: CancellationToken,
): Promise<CommandProcessExecution> {
this.terminal.log(`> Executing task: ${command.getCommandAsString()}\n\r`);
this.telemetry.sendTelemetryEvent("dbtCommand", {
command: command.getCommandAsString(),
});
if (command.focus) {
await this.terminal.show(true);
}
const { args } = command!;
if (
!this.pythonEnvironment.pythonPath ||
!this.pythonEnvironment.environmentVariables
) {
throw Error(
"Could not launch command as python environment is not available",
);
}
const tokens: CancellationToken[] = [];
if (token !== undefined) {
tokens.push(token);
}
if (command.token !== undefined) {
tokens.push(command.token);
}
return this.commandProcessExecutionFactory.createCommandProcessExecution({
command: this.pythonEnvironment.pythonPath,
args: ["-c", this.dbtCommand(args)],
tokens,
cwd: getFirstWorkspacePath(),
envVars: this.pythonEnvironment.environmentVariables,
});
}
private dbtCommand(args: string[]): string {
args = args.map((arg) => `r'${arg}'`);
const dbtCustomRunnerImport = workspace
.getConfiguration("dbt")
.get<string>(
"dbtCustomRunnerImport",
"from dbt.cli.main import dbtRunner",
);
return `has_dbt_runner = True
try:
${dbtCustomRunnerImport}
except:
has_dbt_runner = False
if has_dbt_runner:
dbt_cli = dbtRunner()
dbt_cli.invoke([${args}])
else:
import dbt.main
dbt.main.main([${args}])`;
}
}
export class DBTCommand {
constructor(
public statusMessage: string,
public args: string[],
public focus: boolean = false,
public showProgress: boolean = false,
public logToTerminal: boolean = false,
public executionStrategy?: DBTCommandExecutionStrategy,
public token?: CancellationToken,
) {}
addArgument(arg: string) {
this.args.push(arg);
}
getCommandAsString() {
return "dbt " + this.args.join(" ");
}
setExecutionStrategy(executionStrategy: DBTCommandExecutionStrategy) {
this.executionStrategy = executionStrategy;
}
execute(token?: CancellationToken) {
if (this.executionStrategy === undefined) {
throw new Error("Execution strategy is required to run dbt commands");
}
return this.executionStrategy.execute(this, token);
}
setToken(token: CancellationToken) {
this.token = token;
}
}
export interface RunModelParams {
plusOperatorLeft: string;
modelName: string;
plusOperatorRight: string;
}
export interface ExecuteSQLResult {
table: {
column_names: string[];
column_types: string[];
rows: any[][];
};
raw_sql: string;
compiled_sql: string;
}
export class ExecuteSQLError extends Error {
compiled_sql: string;
constructor(message: string, compiled_sql: string) {
super(message);
this.compiled_sql = compiled_sql;
}
}
export interface CompilationResult {
compiled_sql: string;
}
// TODO: standardize error handling
export class DBTIntegrationError extends Error {}
export class DBTIntegrationUnknownError extends Error {}
export interface DBTDetection {
detectDBT(): Promise<boolean>;
}
export interface DBTInstallion {
installDBT(): Promise<void>;
}
export interface HealthcheckArgs {
manifestPath: string;
catalogPath?: string;
config?: any;
configPath?: string;
}
export interface DBTProjectDetection extends Disposable {
discoverProjects(projectConfigFiles: Uri[]): Promise<Uri[]>;
}
export class QueryExecution {
constructor(
private cancelFunc: () => Promise<void>,
private queryResult: () => Promise<ExecuteSQLResult>,
) {}
cancel(): Promise<void> {
return this.cancelFunc();
}
executeQuery(): Promise<ExecuteSQLResult> {
return this.queryResult();
}
}
export type DBColumn = { column: string; dtype: string };
export type Node = {
unique_id: string;
name: string;
resource_type: string;
};
export type SourceNode = {
unique_id: string;
name: string;
resource_type: "source";
table: string;
};
export type DBTNode = Node | SourceNode;
type CatalogItem = {
table_database: string;
table_schema: string;
table_name: string;
column_name: string;
column_type: string;
};
export type Catalog = CatalogItem[];
export interface DBTProjectIntegration extends Disposable {
// initialize execution infrastructure
initializeProject(): Promise<void>;
// called when project configuration is changed
refreshProjectConfig(): Promise<void>;
// retrieve dbt configs
getTargetPath(): string | undefined;
getModelPaths(): string[] | undefined;
getSeedPaths(): string[] | undefined;
getMacroPaths(): string[] | undefined;
getPackageInstallPath(): string | undefined;
getAdapterType(): string | undefined;
getVersion(): number[] | undefined;
// parse manifest
rebuildManifest(): Promise<void>;
// execute queries
executeSQL(query: string, limit: number): Promise<QueryExecution>;
// dbt commands
runModel(command: DBTCommand): Promise<void>;
buildModel(command: DBTCommand): Promise<void>;
buildProject(command: DBTCommand): Promise<void>;
runTest(command: DBTCommand): Promise<void>;
runModelTest(command: DBTCommand): Promise<void>;
compileModel(command: DBTCommand): Promise<void>;
generateDocs(command: DBTCommand): Promise<void>;
executeCommandImmediately(command: DBTCommand): Promise<CommandProcessResult>;
deps(command: DBTCommand): Promise<string>;
debug(command: DBTCommand): Promise<string>;
// altimate commands
unsafeCompileNode(modelName: string): Promise<string>;
unsafeCompileQuery(query: string): Promise<string>;
validateSql(
query: string,
dialect: string,
models: any, // TODO: type this
): Promise<ValidateSqlParseErrorResponse>;
validateSQLDryRun(query: string): Promise<{
bytes_processed: string; // TODO: create type
}>;
getColumnsOfSource(
sourceName: string,
tableName: string,
): Promise<DBColumn[]>;
getColumnsOfModel(modelName: string): Promise<DBColumn[]>;
getCatalog(): Promise<Catalog>;
getDebounceForRebuildManifest(): number;
getBulkSchema(
nodes: DBTNode[],
cancellationToken: CancellationToken,
): Promise<Record<string, DBColumn[]>>;
findPackageVersion(packageName: string): string | undefined;
performDatapilotHealthcheck(
args: HealthcheckArgs,
): Promise<ProjectHealthcheck>;
applyDeferConfig(): Promise<void>;
getAllDiagnostic(): Diagnostic[];
throwDiagnosticsErrorIfAvailable(): void;
getPythonBridgeStatus(): boolean;
}
@provide(DBTCommandExecutionInfrastructure)
export class DBTCommandExecutionInfrastructure {
private queues: Map<string, DBTCommandExecution[]> = new Map<
string,
DBTCommandExecution[]
>();
private queueStates: Map<string, boolean> = new Map<string, boolean>();
constructor(
private pythonEnvironment: PythonEnvironment,
private telemetry: TelemetryService,
private altimate: AltimateRequest,
private terminal: DBTTerminal,
) {}
createPythonBridge(cwd: string): PythonBridge {
let pythonPath = this.pythonEnvironment.pythonPath;
const envVars = this.pythonEnvironment.environmentVariables;
if (pythonPath.endsWith("python.exe")) {
// replace python.exe with pythonw.exe if path exists
const pythonwPath = pythonPath.replace("python.exe", "pythonw.exe");
if (existsSync(pythonwPath)) {
this.terminal.debug(
"DBTCommandExecutionInfrastructure",
`Changing python path to ${pythonwPath}`,
);
pythonPath = pythonwPath;
}
}
this.terminal.debug(
"DBTCommandExecutionInfrastructure",
"Starting python bridge",
{
pythonPath,
cwd,
},
);
return pythonBridge({
python: pythonPath,
cwd: cwd,
env: {
...envVars,
PYTHONPATH: __dirname,
},
detached: true,
});
}
async closePythonBridge(bridge: PythonBridge) {
this.terminal.debug("dbtIntegration", `Closing python bridge`);
try {
await bridge.disconnect();
await bridge.end();
} catch (_) {}
}
createQueue(queueName: string) {
this.queues.set(queueName, []);
}
async addCommandToQueue(queueName: string, command: DBTCommand) {
this.queues.get(queueName)!.push({
command: async (token) => {
await command.execute(token);
},
statusMessage: command.statusMessage,
focus: command.focus,
token: command.token,
showProgress: command.showProgress,
});
this.pickCommandToRun(queueName);
}
private async pickCommandToRun(queueName: string): Promise<void> {
const queue = this.queues.get(queueName)!;
const running = this.queueStates.get(queueName);
if (!running && queue.length > 0) {
this.queueStates.set(queueName, true);
const { command, statusMessage, focus, showProgress } = queue.shift()!;
const commandExecution = async (token?: CancellationToken) => {
try {
await command(token);
} catch (error) {
if (error instanceof NoCredentialsError) {
this.altimate.handlePreviewFeatures();
return;
}
window.showErrorMessage(
extendErrorWithSupportLinks(
`Could not run command '${statusMessage}': ` + error + ".",
),
);
this.telemetry.sendTelemetryError("queueRunCommandError", error, {
command: statusMessage,
});
}
};
if (showProgress) {
await window.withProgress(
{
location: focus
? ProgressLocation.Notification
: ProgressLocation.Window,
cancellable: true,
title: statusMessage,
},
async (_, token) => {
await commandExecution(token);
},
);
} else {
await commandExecution();
}
this.queueStates.set(queueName, false);
this.pickCommandToRun(queueName);
}
}
async runCommand(command: DBTCommand) {
const commandExecution: DBTCommandExecution = {
command: async (token) => {
await command.execute(token);
},
statusMessage: command.statusMessage,
focus: command.focus,
};
await window.withProgress(
{
location: commandExecution.focus
? ProgressLocation.Notification
: ProgressLocation.Window,
cancellable: true,
title: commandExecution.statusMessage,
},
async (_, token) => {
try {
return await commandExecution.command(token);
} catch (error) {
window.showErrorMessage(
extendErrorWithSupportLinks(
`Could not run command '${commandExecution.statusMessage}': ` +
(error as Error).message +
".",
),
);
this.telemetry.sendTelemetryError("runCommandError", error, {
command: commandExecution.statusMessage,
});
}
},
);
}
}
@provideSingleton(DBTCommandFactory)
export class DBTCommandFactory {
createVersionCommand(): DBTCommand {
return new DBTCommand("Detecting dbt version...", ["--version"]);
}
createParseCommand(): DBTCommand {
return new DBTCommand("Parsing dbt project...", ["parse"]);
}
createRunModelCommand(params: RunModelParams): DBTCommand {
const { plusOperatorLeft, modelName, plusOperatorRight } = params;
const buildModelCommandAdditionalParams = workspace
.getConfiguration("dbt")
.get<string[]>("runModelCommandAdditionalParams", []);
return new DBTCommand(
"Running dbt model...",
[
"run",
"--select",
`${plusOperatorLeft}${modelName}${plusOperatorRight}`,
...buildModelCommandAdditionalParams,
],
true,
true,
true,
);
}
createBuildModelCommand(params: RunModelParams): DBTCommand {
const { plusOperatorLeft, modelName, plusOperatorRight } = params;
const buildModelCommandAdditionalParams = workspace
.getConfiguration("dbt")
.get<string[]>("buildModelCommandAdditionalParams", []);
return new DBTCommand(
"Building dbt model...",
[
"build",
"--select",
`${plusOperatorLeft}${modelName}${plusOperatorRight}`,
...buildModelCommandAdditionalParams,
],
true,
true,
true,
);
}
createBuildProjectCommand(): DBTCommand {
return new DBTCommand(
"Building dbt project...",
["build"],
true,
true,
true,
);
}
createTestModelCommand(testName: string): DBTCommand {
const testModelCommandAdditionalParams = workspace
.getConfiguration("dbt")
.get<string[]>("testModelCommandAdditionalParams", []);
return new DBTCommand(
"Testing dbt model...",
["test", "--select", testName, ...testModelCommandAdditionalParams],
true,
true,
true,
);
}
createCompileModelCommand(params: RunModelParams): DBTCommand {
const { plusOperatorLeft, modelName, plusOperatorRight } = params;
return new DBTCommand(
"Compiling dbt models...",
[
"compile",
"--select",
`${plusOperatorLeft}${modelName}${plusOperatorRight}`,
],
true,
true,
true,
);
}
createDocsGenerateCommand(): DBTCommand {
return new DBTCommand(
"Generating dbt Docs...",
["docs", "generate"],
true,
true,
true,
);
}
createInstallDepsCommand(): DBTCommand {
return new DBTCommand("Installing packages...", ["deps"], true, true, true);
}
createDebugCommand(): DBTCommand {
return new DBTCommand("Debugging...", ["debug"], true, true, true);
}

import { Disposable, EventEmitter, Terminal, window } from "vscode";
import { provideSingleton, stripANSI } from "../utils";
import { TelemetryService } from "../telemetry";
import { PythonException } from "python-bridge";
@provideSingleton(DBTTerminal)
export class DBTTerminal {
private disposables: Disposable[] = [];
private terminal?: Terminal;
private readonly writeEmitter = new EventEmitter<string>();
private outputChannel = window.createOutputChannel(`Log - dbt`, {
log: true,
});
constructor(private telemetry: TelemetryService) {}
async show(status: boolean) {
if (status) {
await this.requireTerminal();
this.terminal!.show(!status);
}
}
logNewLine() {
this.log("\r\n");
}
logLine(line: string) {
this.log(line);
this.logNewLine();
}
logHorizontalRule() {
this.logLine(
"--------------------------------------------------------------------------",
);
}
logBlock(block: string[]) {
this.logHorizontalRule();
for (const line of block) {
this.logLine(line);
}
this.logHorizontalRule();
}
logBlockWithHeader(header: string[], block: string[]) {
this.logHorizontalRule();
for (const line of header) {
this.logLine(line);
}
this.logHorizontalRule();
for (const line of block) {
this.logLine(line);
}
this.logHorizontalRule();
}
log(message: string, ...args: any[]) {
this.outputChannel.info(stripANSI(message), args);
console.log(stripANSI(message), args);
if (this.terminal !== undefined) {
this.writeEmitter.fire(message);
}
}
trace(message: string) {
this.outputChannel?.appendLine(stripANSI(message));
console.log(message);
}
debug(name: string, message: string, ...args: any[]) {
this.outputChannel?.debug(`${name}:${stripANSI(message)}`, args);
console.debug(message, args);
}
info(
name: string,
message: string,
sendTelemetry: boolean = true,
...args: any[]
) {
this.outputChannel?.info(`${name}:${stripANSI(message)}`, args);
console.info(`${name}:${message}`, args);
if (sendTelemetry) {
this.telemetry.sendTelemetryEvent(name, { message, level: "info" });
}
}
warn(
name: string,
message: string,
sendTelemetry: boolean = true,
...args: any[]
) {
this.outputChannel?.warn(`${name}:${stripANSI(message)}`, args);
console.warn(`${name}:${message}`, args);
if (sendTelemetry) {
this.telemetry.sendTelemetryEvent(name, { message, level: "warn" });
}
}
error(
name: string,
message: string,
e: PythonException | Error | unknown,
sendTelemetry = true,
...args: any[]
) {
if (e instanceof PythonException) {
message += `:${e.exception.message}`;
} else if (e instanceof Error) {
message += `:${e.message}`;
} else {
message += `:${e}`;
}
this.outputChannel?.error(`${name}:${stripANSI(message)}`, args);
console.error(`${name}:${message}`, args);
if (sendTelemetry) {
this.telemetry.sendTelemetryError(name, e, { message });
}
}
dispose() {
while (this.disposables.length) {
const x = this.disposables.pop();
x?.dispose();
}
}
private async requireTerminal() {
if (this.terminal === undefined) {
this.terminal = window.createTerminal({
name: "Tasks - dbt",
pty: {
onDidWrite: this.writeEmitter.event,
open: () => this.writeEmitter.fire(""),
close: () => {
this.terminal?.dispose();
this.terminal = undefined;
},
},
});
await new Promise((resolve) => setTimeout(resolve, 100));
}
}

Step 2: ⌨️ Coding

src/dbt_client/dbtCoreIntegration.ts

Update the `unsafeCompileQuery` method to properly resolve the `{{ this }}` expression to the fully qualified model name.
--- 
+++ 
@@ -1,7 +1,19 @@
   async unsafeCompileQuery(query: string): Promise<string> {
     this.throwBridgeErrorIfAvailable();
     const output = await this.python?.lock<CompilationResult>(
-      (python) => python!`to_dict(project.compile_sql(${query}))`,
+      (python) => python!`
+        import re
+
+        def resolve_this(sql, model):
+          if '{{ this }}' in sql:
+            return re.sub(r'{{\s*this\s*}}', f"{model.database}.{model.schema}.{model.alias}", sql)
+          return sql
+
+        compiled = project.compile_sql(${query})
+        compiled_sql = resolve_this(compiled.compiled_sql, project.get_ref_node(compiled.node.name))
+
+        to_dict({'compiled_sql': compiled_sql})
+      `,
     );
     return output.compiled_sql;
   }

Step 3: 🔄️ Validating

Your changes have been successfully made to the branch sweep/issue_with_compiled_dbt_preview_option. I have validated these changes using a syntax checker and a linter.


Tip

To recreate the pull request, edit the issue title or description.

This is an automated message generated by Sweep AI.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working sweep
Projects
None yet
Development

No branches or pull requests

3 participants