Skip to content

Commit

Permalink
[card-server] cli command to expose a card server to view realtime up…
Browse files Browse the repository at this point in the history
…dates

- Modified card datastore to accomodate retrieval of runtime data updates
- Added a card viewer html file
- Created a simple HTTP based card server that will help showcase the realtime cards from querying the server
- Card datastore's read and write path retrieval methods now explicitly are given the suffix they retrieve from. We do this because the suffix determines if we are extracting a card or a data update
- Added a private method in the `Card` (user-interface) to get the data related to a card.
  • Loading branch information
valayDave committed Sep 26, 2023
1 parent c2ed928 commit cdd6fb1
Show file tree
Hide file tree
Showing 5 changed files with 521 additions and 22 deletions.
93 changes: 92 additions & 1 deletion metaflow/plugins/cards/card_cli.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from metaflow.client import Task
from metaflow import JSONType, namespace
from metaflow.exception import CommandException
from metaflow.util import resolve_identity
from metaflow.exception import (
CommandException,
MetaflowNotFound,
MetaflowNamespaceMismatch,
)
import webbrowser
import re
from metaflow._vendor import click
Expand Down Expand Up @@ -776,3 +781,89 @@ def list(
show_list_as_json=as_json,
file=file,
)


@card.command(help="Run local card viewer server")
@click.option(
"--run-id",
default=None,
show_default=True,
type=str,
help="Run ID of the flow",
)
@click.option(
"--port",
default=8324,
show_default=True,
type=int,
help="Port on which Metaflow card server will run",
)
@click.option(
"--namespace",
"user_namespace",
default=None,
show_default=True,
type=str,
help="Namespace of the flow",
)
@click.option(
"--max-cards",
default=30,
show_default=True,
type=int,
help="Maximum number of cards to be shown at any time by the server",
)
@click.pass_context
def server(ctx, run_id, port, user_namespace, max_cards):
from .card_server import create_card_server, CardServerOptions
user_namespace = resolve_identity() if user_namespace is None else user_namespace
run = _get_run_object(ctx.obj, run_id, user_namespace)
options = CardServerOptions(
run_object=run,
only_running=False,
follow_resumed=False,
flow_datastore=ctx.obj.flow_datastore,
max_cards=max_cards,
)
create_card_server(options, port, ctx.obj)


def _get_run_object(obj, run_id, user_namespace):
from metaflow import Flow, Run, Task

flow_name = obj.flow.name
try:
if run_id is not None:
namespace(None)
else:
_msg = "Searching for runs in namespace: %s" % user_namespace
obj.echo(_msg, fg="blue", bold=False)
namespace(user_namespace)
flow = Flow(pathspec=flow_name)
except MetaflowNotFound:
raise CommandException("No run found for *%s*." % flow_name)

except MetaflowNamespaceMismatch:
raise CommandException(
"No run found for *%s* in namespace *%s*. You can switch the namespace using --namespace"
% (flow_name, user_namespace)
)

if run_id is None:
run_id = flow.latest_run.pathspec

else:
assert len(run_id.split("/")) == 1, "run_id should be of the form <runid>"
run_id = "/".join([flow_name, run_id])

try:
run = Run(run_id)
except MetaflowNotFound:
raise CommandException("No run found for runid: *%s*." % run_id)
except MetaflowNamespaceMismatch:
raise CommandException(
"No run found for runid: *%s* in namespace *%s*. You can switch the namespace using --namespace"
% (run_id, user_namespace)
)
obj.echo("Using run-id %s" % run_id, fg="blue", bold=False)
return run
14 changes: 12 additions & 2 deletions metaflow/plugins/cards/card_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from metaflow.datastore import FlowDataStore
from metaflow.metaflow_config import CARD_SUFFIX
from .card_resolver import resolve_paths_from_task, resumed_info
from .card_datastore import CardDatastore
from .card_datastore import CardDatastore, CardNameSuffix
from .exception import (
UnresolvableDatastoreException,
IncorrectArguementException,
Expand Down Expand Up @@ -57,6 +57,15 @@ def __init__(
# Tempfile to open stuff in browser
self._temp_file = None

def _get_data(self) -> Optional[dict]:
# currently an internal method to retrieve a card's data.
data_paths = self._card_ds.extract_data_paths(
card_type=self.type, card_hash=self.hash, card_id=self._card_id
)
if len(data_paths) == 0:
return None
return self._card_ds.get_card_data(data_paths[0])

def get(self) -> str:
"""
Retrieves the HTML contents of the card from the
Expand Down Expand Up @@ -172,7 +181,7 @@ def _get_card(self, index):
if index >= self._high:
raise IndexError
path = self._card_paths[index]
card_info = self._card_ds.card_info_from_path(path)
card_info = self._card_ds.card_info_from_path(path, suffix=CardNameSuffix.CARD)
# todo : find card creation date and put it in client.
return Card(
self._card_ds,
Expand Down Expand Up @@ -252,6 +261,7 @@ def get_cards(
# Exception that the task argument should be of form `Task` or `str`
raise IncorrectArguementException(_TYPE(task))

origin_taskpathspec = None
if follow_resumed:
origin_taskpathspec = resumed_info(task)
if origin_taskpathspec:
Expand Down
112 changes: 93 additions & 19 deletions metaflow/plugins/cards/card_datastore.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,16 @@
CardInfo = namedtuple("CardInfo", ["type", "hash", "id", "filename"])


class CardNameSuffix:
DATA = "data.json"
CARD = "html"


class CardPathSuffix:
DATA = "runtime"
CARD = "cards"


def path_spec_resolver(pathspec):
splits = pathspec.split("/")
splits.extend([None] * (4 - len(splits)))
Expand Down Expand Up @@ -86,18 +96,24 @@ def __init__(self, flow_datastore, pathspec=None):
self._run_id = run_id
self._step_name = step_name
self._pathspec = pathspec
self._temp_card_save_path = self._get_write_path(base_pth=TEMP_DIR_NAME)
self._temp_card_save_path = self._get_write_path(
base_pth=TEMP_DIR_NAME, suffix=CardPathSuffix.CARD
)

@classmethod
def get_card_location(cls, base_path, card_name, uuid, card_id=None, suffix="html"):
def get_card_location(
cls, base_path, card_name, uuid, card_id=None, suffix=CardNameSuffix.CARD
):
chash = uuid
if card_id is None:
card_file_name = "%s-%s.%s" % (card_name, chash, suffix)
else:
card_file_name = "%s-%s-%s.%s" % (card_name, card_id, chash, suffix)
return os.path.join(base_path, card_file_name)

def _make_path(self, base_pth, pathspec=None, with_steps=False, suffix="cards"):
def _make_path(
self, base_pth, pathspec=None, with_steps=False, suffix=CardPathSuffix.CARD
):
sysroot = base_pth
if pathspec is not None:
# since most cards are at a task level there will always be 4 non-none values returned
Expand Down Expand Up @@ -138,16 +154,27 @@ def _make_path(self, base_pth, pathspec=None, with_steps=False, suffix="cards"):
pth_arr.pop(0)
return os.path.join(*pth_arr)

def _get_write_path(self, base_pth="", suffix="cards"):
def _get_write_path(self, base_pth="", suffix=CardPathSuffix.CARD):
return self._make_path(
base_pth, pathspec=self._pathspec, with_steps=True, suffix=suffix
)

def _get_read_path(self, base_pth="", with_steps=False):
return self._make_path(base_pth, pathspec=self._pathspec, with_steps=with_steps)
def _get_read_path(self, base_pth="", with_steps=False, suffix=CardPathSuffix.CARD):
# Data paths will always be under the path with steps
if suffix == CardPathSuffix.DATA:
return self._make_path(
base_pth=base_pth,
pathspec=self._pathspec,
with_steps=True,
suffix=suffix,
)

return self._make_path(
base_pth, pathspec=self._pathspec, with_steps=with_steps, suffix=suffix
)

@staticmethod
def card_info_from_path(path):
def card_info_from_path(path, suffix=CardNameSuffix.CARD):
"""
Args:
path (str): The path to the card
Expand All @@ -163,8 +190,8 @@ def card_info_from_path(path):

if len(file_split) not in [2, 3]:
raise Exception(
"Invalid card file name %s. Card file names should be of form TYPE-HASH.html or TYPE-ID-HASH.html"
% card_file_name
"Invalid file name %s. Card/Data file names should be of form TYPE-HASH.%s or TYPE-ID-HASH.%s"
% (card_file_name, suffix, suffix)
)
card_type, card_hash, card_id = None, None, None

Expand All @@ -173,17 +200,17 @@ def card_info_from_path(path):
else:
card_type, card_id, card_hash = file_split

card_hash = card_hash.split(".html")[0]
card_hash = card_hash.split("." + suffix)[0]
return CardInfo(card_type, card_hash, card_id, card_file_name)

def save_data(self, uuid, card_type, json_data, card_id=None):
card_file_name = card_type
loc = self.get_card_location(
self._get_write_path(suffix="runtime"),
self._get_write_path(suffix=CardPathSuffix.DATA),
card_file_name,
uuid,
card_id=card_id,
suffix="data.json",
suffix=CardNameSuffix.DATA,
)
self._backend.save_bytes(
[(loc, BytesIO(json.dumps(json_data).encode("utf-8")))], overwrite=True
Expand All @@ -209,7 +236,11 @@ def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True):
# It will also easily end up breaking the metaflow-ui (which maybe using a client from an older version).
# Hence, we are writing cards to both paths so that we can introduce breaking changes later in the future.
card_path_with_steps = self.get_card_location(
self._get_write_path(), card_file_name, uuid, card_id=card_id
self._get_write_path(suffix=CardPathSuffix.CARD),
card_file_name,
uuid,
card_id=card_id,
suffix=CardNameSuffix.CARD,
)
if SKIP_CARD_DUALWRITE:
self._backend.save_bytes(
Expand All @@ -218,28 +249,31 @@ def save_card(self, uuid, card_type, card_html, card_id=None, overwrite=True):
)
else:
card_path_without_steps = self.get_card_location(
self._get_read_path(with_steps=False),
self._get_read_path(with_steps=False, suffix=CardPathSuffix.CARD),
card_file_name,
uuid,
card_id=card_id,
suffix=CardNameSuffix.CARD,
)
for cp in [card_path_with_steps, card_path_without_steps]:
self._backend.save_bytes(
[(cp, BytesIO(bytes(card_html, "utf-8")))], overwrite=overwrite
)

return self.card_info_from_path(card_path_with_steps)
return self.card_info_from_path(
card_path_with_steps, suffix=CardNameSuffix.CARD
)

def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):
# Check for new cards first
card_paths = []
card_paths_with_steps = self._backend.list_content(
[self._get_read_path(with_steps=True)]
[self._get_read_path(with_steps=True, suffix=CardPathSuffix.CARD)]
)

if len(card_paths_with_steps) == 0:
card_paths_without_steps = self._backend.list_content(
[self._get_read_path(with_steps=False)]
[self._get_read_path(with_steps=False, suffix=CardPathSuffix.CARD)]
)
if len(card_paths_without_steps) == 0:
# If there are no files found on the Path then raise an error of
Expand All @@ -256,7 +290,7 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):
cards_found = []
for task_card_path in card_paths:
card_path = task_card_path.path
card_info = self.card_info_from_path(card_path)
card_info = self.card_info_from_path(card_path, suffix=CardNameSuffix.CARD)
if card_type is not None and card_info.type != card_type:
continue
elif card_hash is not None:
Expand All @@ -270,11 +304,35 @@ def _list_card_paths(self, card_type=None, card_hash=None, card_id=None):

return cards_found

def _list_card_data(self, card_type=None, card_hash=None, card_id=None):
card_data_paths = self._backend.list_content(
[self._get_read_path(suffix=CardPathSuffix.DATA)]
)
data_found = []

for data_path in card_data_paths:
_pth = data_path.path
card_info = self.card_info_from_path(_pth, suffix=CardNameSuffix.DATA)
if card_type is not None and card_info.type != card_type:
continue
elif card_hash is not None:
if not card_info.hash.startswith(card_hash):
continue
elif card_id is not None and card_info.id != card_id:
continue
if data_path.is_file:
data_found.append(_pth)

return data_found

def create_full_path(self, card_path):
return os.path.join(self._backend.datastore_root, card_path)

def get_card_names(self, card_paths):
return [self.card_info_from_path(path) for path in card_paths]
return [
self.card_info_from_path(path, suffix=CardNameSuffix.CARD)
for path in card_paths
]

def get_card_html(self, path):
with self._backend.load_bytes([path]) as get_results:
Expand All @@ -283,6 +341,13 @@ def get_card_html(self, path):
with open(path, "r") as f:
return f.read()

def get_card_data(self, path):
with self._backend.load_bytes([path]) as get_results:
for _, path, _ in get_results:
if path is not None:
with open(path, "r") as f:
return json.loads(f.read())

def cache_locally(self, path, save_path=None):
"""
Saves the data present in the `path` the `metaflow_card_cache` directory or to the `save_path`.
Expand All @@ -308,6 +373,15 @@ def cache_locally(self, path, save_path=None):
shutil.copy(path, main_path)
return main_path

def extract_data_paths(self, card_type=None, card_hash=None, card_id=None):
return self._list_card_data(
# card_hash is the unique identifier to the card.
# Its no longer the actual hash!
card_type=card_type,
card_hash=card_hash,
card_id=card_id,
)

def extract_card_paths(self, card_type=None, card_hash=None, card_id=None):
return self._list_card_paths(
card_type=card_type, card_hash=card_hash, card_id=card_id
Expand Down
Loading

0 comments on commit cdd6fb1

Please sign in to comment.