Skip to content

Commit

Permalink
Merge branch 'main' into ss/added_node_status
Browse files Browse the repository at this point in the history
  • Loading branch information
AleksanderWWW authored Jun 6, 2024
2 parents 34c7914 + 239b7af commit b95dc83
Show file tree
Hide file tree
Showing 5 changed files with 32 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

### Features
- Added pipeline running log and execution order ([#82](https://github.com/neptune-ai/kedro-neptune/pull/82))
- Added support for the `NEPTUNE_CUSTOM_RUN_ID` environment variable ([#81](https://github.com/neptune-ai/kedro-neptune/pull/81))

## 0.4.0

Expand Down
8 changes: 5 additions & 3 deletions src/kedro_neptune/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,15 @@
try:
# neptune-client>=1.0.0 package structure
import neptune
from neptune.envs import CUSTOM_RUN_ID_ENV_NAME
from neptune.handler import Handler
from neptune.integrations.utils import join_paths
from neptune.types import File
from neptune.utils import stringify_unsupported
except ImportError:
# neptune-client=0.9.0+ package structure
import neptune.new as neptune
from neptune.new.envs import CUSTOM_RUN_ID_ENV_NAME
from neptune.new.handler import Handler
from neptune.new.integrations.utils import join_paths
from neptune.new.types import File
Expand Down Expand Up @@ -446,12 +448,12 @@ def __init__(self):

@hook_impl
def after_catalog_created(self, catalog: DataCatalog) -> None:
self._run_id = hashlib.md5(str(time.time()).encode()).hexdigest()
self._run_id = os.getenv(CUSTOM_RUN_ID_ENV_NAME, hashlib.md5(str(time.time()).encode()).hexdigest())

config = get_neptune_config(settings)

if config.enabled:
os.environ["NEPTUNE_CUSTOM_RUN_ID"] = self._run_id
if config.enabled and not os.getenv(CUSTOM_RUN_ID_ENV_NAME):
os.environ[CUSTOM_RUN_ID_ENV_NAME] = self._run_id

catalog.add(dataset_name="neptune_run", dataset=NeptuneRunDataset())

Expand Down
11 changes: 11 additions & 0 deletions tests/kedro_neptune/test_standard.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
from datetime import datetime

from tests.kedro_neptune.utils.kedro_utils import run_pipeline
from tests.kedro_neptune.utils.run_utils import assert_structure

Expand All @@ -25,3 +27,12 @@ def test_run():
def test_run_with_params():
run_pipeline(project="planets", session_params={"extra_params": {"travel_speed": 40000}})
assert_structure(travel_speed=40000)


def test_run_with_custom_run_id():
custom_run_id = str(datetime.now())
run_pipeline(
project="planets",
custom_run_id=custom_run_id,
)
assert_structure(custom_run_id=custom_run_id)
12 changes: 11 additions & 1 deletion tests/kedro_neptune/utils/kedro_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#
__all__ = ["run_pipeline"]

import os
from pathlib import Path
from typing import (
Any,
Expand All @@ -26,12 +27,21 @@
from kedro.framework.startup import bootstrap_project


def run_pipeline(project: str, run_params: Dict[str, Any] = None, session_params: Dict[str, Any] = None):
def run_pipeline(
project: str,
run_params: Dict[str, Any] = None,
session_params: Dict[str, Any] = None,
custom_run_id: str = None,
):
if run_params is None:
run_params = {}

if session_params is None:
session_params = {}

if custom_run_id:
os.environ["NEPTUNE_CUSTOM_RUN_ID"] = custom_run_id

configure_project(project)

bootstrap_project(Path(".").resolve())
Expand Down
5 changes: 4 additions & 1 deletion tests/kedro_neptune/utils/run_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@

# It may take some time to refresh cache
# @backoff.on_exception(backoff.expo, AssertionError, max_value=1, max_time=60)
def assert_structure(travel_speed: int = 10000):
def assert_structure(travel_speed: int = 10000, custom_run_id: Optional[str] = None):
time.sleep(30)
with restore_run() as run:
run.sync(wait=True)
Expand Down Expand Up @@ -128,6 +128,9 @@ def assert_structure(travel_speed: int = 10000):
assert run.exists("kedro/nodes/travel_time/parameters/travel_speed")
assert run["kedro/nodes/travel_time/parameters/travel_speed"].fetch() == travel_speed

if custom_run_id:
assert run["sys/custom_run_id"].fetch() == custom_run_id

# User defined data
assert run.exists("furthest_planet")
assert run.exists("furthest_planet/name")
Expand Down

0 comments on commit b95dc83

Please sign in to comment.