Skip to content

Commit

Permalink
* Removed status and added running log
Browse files Browse the repository at this point in the history
* Added pipeline execution order
* Added stringify_unsupported
  • Loading branch information
SiddhantSadangi committed Jun 6, 2024
1 parent 098fd63 commit 34c7914
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
## [UNRELEASED] 0.5.0

### Features
- Added node status tracking ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82))
- Added pipeline running log and execution order ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82))

## 0.4.0

Expand Down
10 changes: 6 additions & 4 deletions src/kedro_neptune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,8 @@ def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline):
)
)

namespace["execution_order"] = pipeline.describe()


def log_run_params(namespace: Handler, run_params: Dict[str, Any]):
namespace["run_params"] = stringify_unsupported(run_params)
Expand Down Expand Up @@ -489,7 +491,7 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal

run = catalog.load("neptune_run")

run["status/currently_running"] = node.short_name
run["log"].append(f"Running {node.short_name}")

current_namespace = run[f"nodes/{node.short_name}"]

Expand All @@ -500,7 +502,7 @@ def before_node_run(self, node: Node, inputs: Dict[str, Any], catalog: DataCatal

for input_name, input_value in inputs.items():
if input_name.startswith("params:"):
current_namespace[f'parameters/{input_name[len("params:"):]}'] = input_value
current_namespace[f'parameters/{input_name[len("params:"):]}'] = stringify_unsupported(input_value)

self._node_execution_timers[node.short_name] = time.time()

Expand All @@ -515,8 +517,7 @@ def after_node_run(self, node: Node, catalog: DataCatalog, outputs: Dict[str, An

run = catalog.load("neptune_run")

run["status/last_run"] = node.short_name
run["status/currently_running"] = "None"
run["log"].append(f"Finished {node.short_name}")

current_namespace = run[f"nodes/{node.short_name}"]
current_namespace["execution_time"] = execution_time
Expand All @@ -541,6 +542,7 @@ def after_pipeline_run(self, catalog: DataCatalog) -> None:

run = catalog.load("neptune_run")
log_data_catalog_metadata(namespace=run, catalog=catalog)
run["log"].append("Finished pipeline")
run.container.sync()


Expand Down
6 changes: 2 additions & 4 deletions tests/kedro_neptune/utils/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ def assert_structure(travel_speed: int = 10000):
assert run.exists("kedro/kedro_command")
assert run.exists("kedro/run_params")
assert run.exists("kedro/structure")
assert run.exists("kedro/execution_order")
assert run.exists("kedro/log")

# Data catalog
assert run.exists("kedro/catalog/datasets")
Expand Down Expand Up @@ -126,10 +128,6 @@ def assert_structure(travel_speed: int = 10000):
assert run.exists("kedro/nodes/travel_time/parameters/travel_speed")
assert run["kedro/nodes/travel_time/parameters/travel_speed"].fetch() == travel_speed

# Status
assert run["kedro/status/currently_running"].fetch() == "None"
assert run["kedro/status/last_run"].fetch() == "travel_time"

# User defined data
assert run.exists("furthest_planet")
assert run.exists("furthest_planet/name")
Expand Down

0 comments on commit 34c7914

Please sign in to comment.