Skip to content

Commit

Permalink
Load data functions move out of server
Browse files Browse the repository at this point in the history
Signed-off-by: Jitendra Gundaniya <jitendra_gundaniya@mckinsey.com>
  • Loading branch information
jitu5 committed Aug 23, 2024
1 parent df31f92 commit 80d35b6
Show file tree
Hide file tree
Showing 4 changed files with 70 additions and 63 deletions.
2 changes: 1 addition & 1 deletion package/kedro_viz/launchers/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
_wait_for,
viz_deploy_progress_timer,
)
from kedro_viz.server import load_and_populate_data
from kedro_viz.load_data import load_and_populate_data

try:
from azure.core.exceptions import ServiceRequestError
Expand Down
67 changes: 67 additions & 0 deletions package/kedro_viz/load_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""Module to load data from Kedro project and populate Kedro Viz Repositories"""

from pathlib import Path
from typing import Any, Dict, Optional

from kedro.framework.session.store import BaseSessionStore
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline

from kedro_viz.data_access import DataAccessManager, data_access_manager
from kedro_viz.database import make_db_session_factory
from kedro_viz.integrations.kedro import data_loader as kedro_data_loader
from kedro_viz.integrations.kedro.sqlite_store import SQLiteStore


def populate_data(
data_access_manager: DataAccessManager,
catalog: DataCatalog,
pipelines: Dict[str, Pipeline],
session_store: BaseSessionStore,
stats_dict: Dict,
): # pylint: disable=redefined-outer-name
"""Populate data repositories. Should be called once on application start
if creating an api app from project.
"""

if isinstance(session_store, SQLiteStore):
session_store.sync()
session_class = make_db_session_factory(session_store.location)
data_access_manager.set_db_session(session_class)

data_access_manager.add_catalog(catalog, pipelines)

# add dataset stats before adding pipelines as the data nodes
# need stats information and they are created during add_pipelines
data_access_manager.add_dataset_stats(stats_dict)

data_access_manager.add_pipelines(pipelines)


def load_and_populate_data(
path: Path,
env: Optional[str] = None,
include_hooks: bool = False,
package_name: Optional[str] = None,
pipeline_name: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
):
"""Loads underlying Kedro project data and populates Kedro Viz Repositories"""

# Loads data from underlying Kedro Project
catalog, pipelines, session_store, stats_dict = kedro_data_loader.load_data(
path,
env,
include_hooks,
package_name,
extra_params,
)

pipelines = (
pipelines
if pipeline_name is None
else {pipeline_name: pipelines[pipeline_name]}
)

# Creates data repositories which are used by Kedro Viz Backend APIs
populate_data(data_access_manager, catalog, pipelines, session_store, stats_dict)
62 changes: 1 addition & 61 deletions package/kedro_viz/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,77 +7,17 @@

import fsspec
import uvicorn
from kedro.framework.session.store import BaseSessionStore
from kedro.io import DataCatalog
from kedro.pipeline import Pipeline
from watchgod import RegExpWatcher, run_process

from kedro_viz.api import apps
from kedro_viz.api.rest.responses import save_api_responses_to_fs
from kedro_viz.constants import DEFAULT_HOST, DEFAULT_PORT
from kedro_viz.data_access import DataAccessManager, data_access_manager
from kedro_viz.database import make_db_session_factory
from kedro_viz.integrations.kedro import data_loader as kedro_data_loader
from kedro_viz.integrations.kedro.sqlite_store import SQLiteStore
from kedro_viz.launchers.utils import _check_viz_up, _wait_for
from kedro_viz.load_data import load_and_populate_data

DEV_PORT = 4142


def populate_data(
data_access_manager: DataAccessManager,
catalog: DataCatalog,
pipelines: Dict[str, Pipeline],
session_store: BaseSessionStore,
stats_dict: Dict,
): # pylint: disable=redefined-outer-name
"""Populate data repositories. Should be called once on application start
if creating an api app from project.
"""

if isinstance(session_store, SQLiteStore):
session_store.sync()
session_class = make_db_session_factory(session_store.location)
data_access_manager.set_db_session(session_class)

data_access_manager.add_catalog(catalog, pipelines)

# add dataset stats before adding pipelines as the data nodes
# need stats information and they are created during add_pipelines
data_access_manager.add_dataset_stats(stats_dict)

data_access_manager.add_pipelines(pipelines)


def load_and_populate_data(
path: Path,
env: Optional[str] = None,
include_hooks: bool = False,
package_name: Optional[str] = None,
pipeline_name: Optional[str] = None,
extra_params: Optional[Dict[str, Any]] = None,
):
"""Loads underlying Kedro project data and populates Kedro Viz Repositories"""

# Loads data from underlying Kedro Project
catalog, pipelines, session_store, stats_dict = kedro_data_loader.load_data(
path,
env,
include_hooks,
package_name,
extra_params,
)

pipelines = (
pipelines
if pipeline_name is None
else {pipeline_name: pipelines[pipeline_name]}
)

# Creates data repositories which are used by Kedro Viz Backend APIs
populate_data(data_access_manager, catalog, pipelines, session_store, stats_dict)


def run_server(
host: str = DEFAULT_HOST,
port: int = DEFAULT_PORT,
Expand Down
2 changes: 1 addition & 1 deletion package/tests/test_integrations/test_base_deployer.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def test_deploy(self, mocker):
mocker.patch("fsspec.filesystem")
build = ConcreteBaseDeployer()

mocker.patch("kedro_viz.server.load_and_populate_data")
mocker.patch("kedro_viz.load_data.load_and_populate_data")
mocker.patch.object(build, "_upload_static_files")
mocker.patch.object(build, "_upload_api_responses")
mocker.patch.object(build, "_upload_deploy_viz_metadata_file")
Expand Down

0 comments on commit 80d35b6

Please sign in to comment.