Skip to content

Commit

Permalink
Fix installing deps when using profile_mapping & `ExecutionMode.LOC…
Browse files Browse the repository at this point in the history
…AL` (#659)

Extends the local operator when running `dbt deps` with the provides
profile flags.

This makes the logic consistent between DAG parsing and task running as
referenced below

https://github.com/astronomer/astronomer-cosmos/blob/8e2d5908ce89aa98813af6dfd112239e124bd69a/cosmos/dbt/graph.py#L247-L266

Closes: #658
  • Loading branch information
joppevos authored Nov 10, 2023
1 parent a322c5f commit 6e41471
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 11 deletions.
26 changes: 16 additions & 10 deletions cosmos/operators/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,17 +206,11 @@ def run_command(
tmp_project_dir,
)

# if we need to install deps, do so
if self.install_deps:
self.run_subprocess(
command=[self.dbt_executable_path, "deps"],
env=env,
output_encoding=self.output_encoding,
cwd=tmp_project_dir,
)
with self.profile_config.ensure_profile() as (profile_path, env_vars):
with self.profile_config.ensure_profile() as profile_values:
(profile_path, env_vars) = profile_values
env.update(env_vars)
full_cmd = cmd + [

flags = [
"--profiles-dir",
str(profile_path.parent),
"--profile",
Expand All @@ -225,6 +219,18 @@ def run_command(
self.profile_config.target_name,
]

if self.install_deps:
deps_command = [self.dbt_executable_path, "deps"]
deps_command.extend(flags)
self.run_subprocess(
command=deps_command,
env=env,
output_encoding=self.output_encoding,
cwd=tmp_project_dir,
)

full_cmd = cmd + flags

logger.info("Trying to run the command:\n %s\nFrom %s", full_cmd, tmp_project_dir)
logger.info("Using environment variables keys: %s", env.keys())
result = self.run_subprocess(
Expand Down
30 changes: 30 additions & 0 deletions tests/operators/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,33 @@ def test_dbt_docs_gcs_local_operator():
call(filename="fake-dir/target/file2", bucket_name="fake-bucket", object_name="fake-folder/file2"),
]
mock_hook.upload.assert_has_calls(expected_upload_calls)


@patch("cosmos.operators.local.DbtLocalBaseOperator.store_compiled_sql")
@patch("cosmos.operators.local.DbtLocalBaseOperator.exception_handling")
@patch("cosmos.config.ProfileConfig.ensure_profile")
@patch("cosmos.operators.local.DbtLocalBaseOperator.run_subprocess")
def test_operator_execute_deps_parameters(
mock_build_and_run_cmd, mock_ensure_profile, mock_exception_handling, mock_store_compiled_sql
):
expected_call_kwargs = [
"/usr/local/bin/dbt",
"deps",
"--profiles-dir",
"/path/to",
"--profile",
"default",
"--target",
"dev",
]
task = DbtRunLocalOperator(
profile_config=real_profile_config,
task_id="my-task",
project_dir=DBT_PROJ_DIR,
install_deps=True,
emit_datasets=False,
dbt_executable_path="/usr/local/bin/dbt",
)
mock_ensure_profile.return_value.__enter__.return_value = (Path("/path/to/profile"), {"ENV_VAR": "value"})
task.execute(context={"task_instance": MagicMock()})
assert mock_build_and_run_cmd.call_args_list[0].kwargs["command"] == expected_call_kwargs
2 changes: 1 addition & 1 deletion tests/operators/test_virtualenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def test_run_command(
dbt_cmd = run_command_args[2]
assert python_cmd[0][0][0].endswith("/bin/python")
assert python_cmd[0][-1][-1] == "from importlib.metadata import version; print(version('dbt-core'))"
assert dbt_deps[0][0][-1] == "deps"
assert dbt_deps[0][0][1] == "deps"
assert dbt_deps[0][0][0].endswith("/bin/dbt")
assert dbt_deps[0][0][0] == dbt_cmd[0][0][0]
assert dbt_cmd[0][0][1] == "do-something"
Expand Down

0 comments on commit 6e41471

Please sign in to comment.