Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add dataset addition to eligible CLI tools #293

Merged
merged 18 commits into from
Apr 11, 2022
69 changes: 0 additions & 69 deletions pyluna-common/luna/common/cli/post_to_dataset.py

This file was deleted.

6 changes: 3 additions & 3 deletions pyluna-common/luna/common/stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,16 @@ def compute_stats_1d(vec, fx_name_prefix, n_percentiles=4):
dict: summary statistics
"""
n, _, sm, sv, ss, sk = scipy.stats.describe(vec)
ln_params = scipy.stats.lognorm.fit(vec, floc=0)
# ln_params = scipy.stats.lognorm.fit(vec, floc=0)

hist_features = {
f'{fx_name_prefix}_nobs': n,
f'{fx_name_prefix}_mean': sm,
f'{fx_name_prefix}_variance': sv,
f'{fx_name_prefix}_skewness': ss,
f'{fx_name_prefix}_kurtosis': sk,
f'{fx_name_prefix}_lognorm_fit_p0': ln_params[0],
f'{fx_name_prefix}_lognorm_fit_p2': ln_params[2]
# f'{fx_name_prefix}_lognorm_fit_p0': ln_params[0],
# f'{fx_name_prefix}_lognorm_fit_p2': ln_params[2]
}

percentiles = np.linspace(0, 100, n_percentiles + 1)
Expand Down
151 changes: 123 additions & 28 deletions pyluna-common/luna/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,16 @@
from luna.common.CodeTimer import CodeTimer
import itertools

import shutil

import pandas as pd
from pathlib import Path

import requests
from functools import partial
import urllib


logger = logging.getLogger(__name__)

# Distinct types that are actually the same (effectively)
Expand Down Expand Up @@ -241,7 +248,8 @@ def validate_params(given_params: dict, params_list: List[tuple]):
Returns:
dict: Validated and casted keyword argument dictonary
"""
logger = logging.getLogger(__name__)
logger.info("Validating params...")

d_params = {}
for param, dtype in params_list:
if given_params.get(param, None) is None:
Expand Down Expand Up @@ -273,9 +281,9 @@ def validate_params(given_params: dict, params_list: List[tuple]):
raise e

if param in MASK_KEYS:
logger.info(f"Param {param} set = *****")
logger.info(f" -> Set {param} ({dtype}) = *****")
else:
logger.info(f"Param {param} set = {d_params[param]}")
logger.info(f" -> Set {param} ({dtype}) = {d_params[param]}")

return d_params

Expand All @@ -292,12 +300,15 @@ def expand_inputs(given_params: dict):
d_params = {}
d_keys = {}

logger.info("Expanding inputs...")

for param, param_value in given_params.items():
if "input_" in param: # We want to treat input_ params a bit differently

# For some inputs, they may be defined as a directory, where metadata about them is at the provided directory path
expected_metadata = os.path.join(param_value, "metadata.yml")
print(expected_metadata)
logger.info(f"Attempting to read metadata at {expected_metadata}")

if os.path.isdir(param_value) and os.path.exists(
expected_metadata
): # Check for this metadata file
Expand All @@ -321,7 +332,7 @@ def expand_inputs(given_params: dict):
f"No matching output slot of type [{param.replace('input_', '')}] at given input directory"
)

logger.info(f"Expanded input {param_value} -> {expanded_input}")
logger.info(f"Expanded input:\n -> {param_value}\n -> {expanded_input}")
d_params[param] = expanded_input

# Query any keys:
Expand All @@ -344,7 +355,66 @@ def expand_inputs(given_params: dict):
return d_params, d_keys


from functools import partial

def get_dataset_url():
""" Retrieve a "dataset URL" from the environment, may look like http://localhost:6077 or file:///absolute/path/to/dataset/dir """
dataset_url = os.environ.get("DATASET_URL", None)

if dataset_url is None:
logger.warning("Requesting feature data be sent to dataset, however no dataset URL provided, please set env DATASET_URL!")
aauker marked this conversation as resolved.
Show resolved Hide resolved
else:
logger.info(f"Found dataset URL = {dataset_url}")

return dataset_url



def post_to_dataset(input_feature_data, waystation_url, dataset_id, keys):
""" Interface feature data to a parquet dataset

Args:
input_feature_data (str): path to input data
waystation_url (str): URL of dataset root (either file or using waystation)
dataset_id (str): Dataset name/ID
keys (dict): corresponding segment keys
"""

logger.info(f"Adding {input_feature_data} to {dataset_id} via {waystation_url}")

segment_id = "-".join(
[v for _, v in sorted(keys.items())]
)

logger.info(f"SEGMENT_ID={segment_id}")

post_url = os.path.join ( waystation_url, "datasets", dataset_id, "segments", segment_id )

parsed_url = urllib.parse.urlparse(post_url)

if 'http' in parsed_url.scheme:
# The cool way, using luna waystation

logger.info (f"Posting to: {post_url}")

res = requests.post(post_url, files={'segment_data': open (input_feature_data, 'rb')}, data={"segment_keys": json.dumps(keys)})

logger.info (f"{res}: {res.text}")

elif 'file' in parsed_url.scheme:
# The less cool way, just using file paths

segment_dir = Path ( parsed_url.path )

logger.info (f"Writing to: {segment_dir}")

os.makedirs(segment_dir, exist_ok=True)

shutil.copy(input_feature_data, segment_dir.joinpath("data.parquet"))

else:
logger.warning("Unrecognized scheme: {parsed_url.scheme}, skipping!")



def cli_runner(
cli_kwargs: dict, cli_params: List[tuple], cli_function: Callable[..., dict], pass_keys: bool = False
Expand All @@ -355,69 +425,94 @@ def cli_runner(
cli_kwargs (dict): keyword arguments from the CLI call
cli_params (List[tuple]): param list, where each element is the parameter (name, type)
cli_function (Callable[..., dict]): cli_function entry point, should accept exactly the arguments given by cli_params
pass_keys (bool): will pass found segment keys to transform function as 'keys' kwarg

Returns:
None
aauker marked this conversation as resolved.
Show resolved Hide resolved

"""
logger.info(f"Running {cli_function} with {cli_kwargs}")
kwargs = {}
logger.info(f"Started CLI Runner wtih {cli_function}")
logger.debug(f"cli_kwargs={cli_kwargs}")
logger.debug(f"cli_params={cli_params}")
logger.debug(f"pass_keys={pass_keys}")

trm_kwargs = {}

# if "output_dir" not in cli_kwargs.keys():
# raise RuntimeError("CLI Runners assume an output directory")

# Get params from param file
if cli_kwargs.get("method_param_path"):
with open(cli_kwargs.get("method_param_path"), "r") as yaml_file:
yaml_kwargs = yaml.safe_load(yaml_file)
kwargs.update(yaml_kwargs) # Fill from json
trm_kwargs.update(yaml_kwargs) # Fill from json

for key in list(cli_kwargs.keys()):
if cli_kwargs[key] is None:
del cli_kwargs[key]

# Override with CLI arguments
kwargs.update(cli_kwargs)
trm_kwargs.update(cli_kwargs)

kwargs = validate_params(kwargs, cli_params)
trm_kwargs = validate_params(trm_kwargs, cli_params)

if "output_dir" in kwargs:
output_dir = kwargs["output_dir"]
if "output_dir" in trm_kwargs:
output_dir = trm_kwargs["output_dir"]
os.makedirs(output_dir, exist_ok=True)

# Expand implied inputs
kwargs, keys = expand_inputs(kwargs)
trm_kwargs, keys = expand_inputs(trm_kwargs)

logger.info (f"Full segment key set: {keys}")

# Nice little log break
print(
"\n"
+ "-" * 35
+ f" Running transform::{cli_function.__name__} "
+ "-" * 35
+ "\n"
logger.info(
"-" * 60
+ f"\n Starting transform::{cli_function.__name__} \n"
+ "-" * 60
)

with CodeTimer(logger, name=f"transform::{cli_function.__name__}"):
if pass_keys: cli_function = partial (cli_function, keys=keys)

result = cli_function(**kwargs)
result = cli_function(**trm_kwargs)

# Nice little log break
logger.info(
"-" * 60
+ f"\n Done with transform, running post-transform functions... \n"
+ "-" * 60
)

kwargs.update(result)
trm_kwargs.update(result)

# filter out kwargs with sensitive data
for key in MASK_KEYS:
kwargs.pop(key, None)
trm_kwargs.pop(key, None)

# propagate keys
if kwargs.get('segment_keys', None):
kwargs['segment_keys'].update(keys)
if trm_kwargs.get('segment_keys', None):
trm_kwargs['segment_keys'].update(keys)
else:
kwargs['segment_keys'] = keys
trm_kwargs['segment_keys'] = keys

if "output_dir" in kwargs:
# Save metadata on disk
if "output_dir" in trm_kwargs:
with open(os.path.join(output_dir, "metadata.yml"), "w") as fp:
yaml.dump(kwargs, fp)
yaml.dump(trm_kwargs, fp)

# Save feature data in parquet if indicated:
if "dataset_id" in cli_kwargs and "feature_data" in trm_kwargs:
dataset_id = cli_kwargs.get("dataset_id")
feature_data = trm_kwargs.get("feature_data")

logger.info(f"Adding feature segment {feature_data} to {dataset_id}")

dataset_url = get_dataset_url()

if dataset_url is not None:
post_to_dataset( feature_data, dataset_url, dataset_id, keys=trm_kwargs['segment_keys'])


logger.info("Done.")

Expand Down
11 changes: 7 additions & 4 deletions pyluna-pathology/luna/pathology/analysis/ml.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,20 @@ def __getitem__(self, idx: int):
return self.preprocess(img), row.name


def post_transform_to_2d(input: torch.Tensor) -> np.array:
def post_transform_to_2d(input: np.array) -> np.array:
"""Convert input to a 2D numpy array on CPU

Args:
input (torch.tensor): tensor input of shape [B, *] where B is the batch dimension
"""
if type (input)== torch.tensor:
input = input.cpu.numpy()

if not len(input.shape) == 2:
warnings.warn(f"Reshaping model output (was {input.shape}) to 2D")
return input.view(input.shape[0], -1).cpu().numpy()
else:
return input.cpu().numpy()
input = np.reshape(input, (input.shape[0], -1))

return input


class BaseTorchTileDataset(Dataset):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def extract_kfunction(input_cell_objects, tile_size, intensity_label, tile_strid
Returns:
dict: metadata about function call
"""
df = pd.read_csv(input_cell_objects)
df = pd.read_parquet(input_cell_objects)

l_address = []
l_k_function = []
Expand Down Expand Up @@ -116,8 +116,8 @@ def extract_kfunction(input_cell_objects, tile_size, intensity_label, tile_strid
logger.info("Generated k-function feature data:")
logger.info (df_stats)

output_tile_header = os.path.join(output_dir, Path(input_cell_objects).stem + '_kfunction_supertiles.csv')
df_stats.to_csv(output_tile_header)
output_tile_header = os.path.join(output_dir, Path(input_cell_objects).stem + '_kfunction_supertiles.parquet')
df_stats.to_parquet(output_tile_header)

properties = {
'slide_tiles': output_tile_header,
Expand Down
Loading