Skip to content

Commit

Permalink
Merge pull request #81 from turn1a/use-existing-neptune-custom-run-id
Browse files Browse the repository at this point in the history
 Use NEPTUNE_CUSTOM_RUN_ID env var for distributed async workflows
  • Loading branch information
AleksanderWWW authored Jun 6, 2024
2 parents 6ed4b10 + ebe0c22 commit 239b7af
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 12 deletions.
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
## 0.5.0

### Features
- Added support for the `NEPTUNE_CUSTOM_RUN_ID` environment variable ([#81](https://github.com/neptune-ai/kedro-neptune/pull/81))

## 0.4.0

### Features
Expand Down
24 changes: 18 additions & 6 deletions src/kedro_neptune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
# limitations under the License.
#

__all__ = ["NeptuneRunDataset", "NeptuneFileDataset", "neptune_hooks", "init", "__version__"]
__all__ = [
"NeptuneRunDataset",
"NeptuneFileDataset",
"neptune_hooks",
"init",
"__version__",
]

import hashlib
import json
Expand Down Expand Up @@ -53,13 +59,15 @@
try:
# neptune-client>=1.0.0 package structure
import neptune
from neptune.envs import CUSTOM_RUN_ID_ENV_NAME
from neptune.handler import Handler
from neptune.integrations.utils import join_paths
from neptune.types import File
from neptune.utils import stringify_unsupported
except ImportError:
# neptune-client=0.9.0+ package structure
import neptune.new as neptune
from neptune.new.envs import CUSTOM_RUN_ID_ENV_NAME
from neptune.new.handler import Handler
from neptune.new.integrations.utils import join_paths
from neptune.new.types import File
Expand Down Expand Up @@ -207,7 +215,10 @@ def init(
config_template = yaml.load(INITIAL_NEPTUNE_CONFIG)
config_template["neptune"]["project"] = project
config_template["neptune"]["base_namespace"] = base_namespace
config_template["neptune"]["upload_source_files"] = ["**/*.py", f"{settings.CONF_SOURCE}/{config}/*.yml"]
config_template["neptune"]["upload_source_files"] = [
"**/*.py",
f"{settings.CONF_SOURCE}/{config}/*.yml",
]
config_template["neptune"]["dependencies"] = dependencies

yaml.dump(config_template, config_file)
Expand Down Expand Up @@ -414,7 +425,8 @@ def log_data_catalog_metadata(namespace: Handler, catalog: DataCatalog):
def log_pipeline_metadata(namespace: Handler, pipeline: Pipeline):
namespace["structure"].upload(
File.from_content(
content=json.dumps(json.loads(pipeline.to_json()), indent=4, sort_keys=True), extension="json"
content=json.dumps(json.loads(pipeline.to_json()), indent=4, sort_keys=True),
extension="json",
)
)

Expand All @@ -434,12 +446,12 @@ def __init__(self):

@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
self._run_id = hashlib.md5(str(time.time()).encode()).hexdigest()
self._run_id = os.getenv(CUSTOM_RUN_ID_ENV_NAME, hashlib.md5(str(time.time()).encode()).hexdigest())

config = get_neptune_config(settings)

if config.enabled:
os.environ["NEPTUNE_CUSTOM_RUN_ID"] = self._run_id
if config.enabled and not os.getenv(CUSTOM_RUN_ID_ENV_NAME):
os.environ[CUSTOM_RUN_ID_ENV_NAME] = self._run_id

catalog.add(dataset_name="neptune_run", dataset=NeptuneRunDataset())

Expand Down
11 changes: 11 additions & 0 deletions tests/kedro_neptune/test_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime

from tests.kedro_neptune.utils.kedro_utils import run_pipeline
from tests.kedro_neptune.utils.run_utils import assert_structure

Expand All @@ -25,3 +27,12 @@ def test_run():
def test_run_with_params():
run_pipeline(project="planets", session_params={"extra_params": {"travel_speed": 40000}})
assert_structure(travel_speed=40000)


def test_run_with_custom_run_id():
custom_run_id = str(datetime.now())
run_pipeline(
project="planets",
custom_run_id=custom_run_id,
)
assert_structure(custom_run_id=custom_run_id)
12 changes: 11 additions & 1 deletion tests/kedro_neptune/utils/kedro_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
__all__ = ["run_pipeline"]

import os
from pathlib import Path
from typing import (
Any,
Expand All @@ -26,12 +27,21 @@
from kedro.framework.startup import bootstrap_project


def run_pipeline(project: str, run_params: Dict[str, Any] = None, session_params: Dict[str, Any] = None):
def run_pipeline(
project: str,
run_params: Dict[str, Any] = None,
session_params: Dict[str, Any] = None,
custom_run_id: str = None,
):
if run_params is None:
run_params = {}

if session_params is None:
session_params = {}

if custom_run_id:
os.environ["NEPTUNE_CUSTOM_RUN_ID"] = custom_run_id

configure_project(project)

bootstrap_project(Path(".").resolve())
Expand Down
27 changes: 22 additions & 5 deletions tests/kedro_neptune/utils/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# It may take some time to refresh cache
# @backoff.on_exception(backoff.expo, AssertionError, max_value=1, max_time=60)
def assert_structure(travel_speed: int = 10000):
def assert_structure(travel_speed: int = 10000, custom_run_id: Optional[str] = None):
time.sleep(30)
with restore_run() as run:
run.sync(wait=True)
Expand Down Expand Up @@ -90,28 +90,45 @@ def assert_structure(travel_speed: int = 10000):

# Nodes data
check_node_metadata(
run=run, node_namespace="kedro/nodes/distances", inputs=["planets"], outputs=["distances_to_planets"]
run=run,
node_namespace="kedro/nodes/distances",
inputs=["planets"],
outputs=["distances_to_planets"],
)
check_node_metadata(
run=run,
node_namespace="kedro/nodes/furthest",
inputs=["distances_to_planets"],
outputs=["furthest_planet_distance", "furthest_planet_name"],
)
check_node_metadata(run=run, node_namespace="kedro/nodes/judge_model", inputs=["neptune_run", "dataset"])
check_node_metadata(
run=run, node_namespace="kedro/nodes/prepare_dataset", inputs=["planets"], outputs=["dataset"]
run=run,
node_namespace="kedro/nodes/judge_model",
inputs=["neptune_run", "dataset"],
)
check_node_metadata(
run=run,
node_namespace="kedro/nodes/prepare_dataset",
inputs=["planets"],
outputs=["dataset"],
)
check_node_metadata(
run=run,
node_namespace="kedro/nodes/travel_time",
inputs=["furthest_planet_distance", "furthest_planet_name", "params:travel_speed"],
inputs=[
"furthest_planet_distance",
"furthest_planet_name",
"params:travel_speed",
],
outputs=["travel_hours"],
)
assert run.exists("kedro/nodes/travel_time/parameters")
assert run.exists("kedro/nodes/travel_time/parameters/travel_speed")
assert run["kedro/nodes/travel_time/parameters/travel_speed"].fetch() == travel_speed

if custom_run_id:
assert run["sys/custom_run_id"].fetch() == custom_run_id

# User defined data
assert run.exists("furthest_planet")
assert run.exists("furthest_planet/name")
Expand Down

0 comments on commit 239b7af

Please sign in to comment.