diff --git a/btrdb/stream.py b/btrdb/stream.py index b777bf5..8d75849 100644 --- a/btrdb/stream.py +++ b/btrdb/stream.py @@ -23,7 +23,9 @@ from collections.abc import Sequence from copy import deepcopy +import pandas as pd import pyarrow as pa +from dask import compute, delayed from btrdb.exceptions import ( BTrDBError, @@ -58,6 +60,7 @@ _arrow_not_impl_str = "The BTrDB server you are using does not support {}." + ########################################################################## ## Stream Classes ########################################################################## @@ -1433,6 +1436,16 @@ def __repr__(self): ########################################################################## +@delayed +def get_metadata(stream): + columns = ["collection", "tags", "annotations", "stream", "uuid"] + stream.refresh_metadata() + metadata = { + c: (getattr(stream, f"_{c}") if c != "stream" else stream) for c in columns + } + return metadata + + class StreamSetBase(Sequence): """ A lighweight wrapper around a list of stream objects @@ -1455,6 +1468,13 @@ def __init__(self, streams): self.width = None self.depth = 0 + # create a DataFrame to store the metadata for filtering + _metadata = compute([get_metadata(s) for s in self._streams])[0] + _metadata = pd.DataFrame(_metadata) + self._metadata = _metadata.join(pd.json_normalize(_metadata["tags"])).drop( + columns=["tags", "annotations"] + ) + @property def allow_window(self): return not bool(self.pointwidth or (self.width and self.depth == 0)) @@ -1712,69 +1732,72 @@ def filter( ) ) + tf = ~obj._metadata.uuid.isna() # shouldn't contain any None + # filter by collection if collection is not None: if isinstance(collection, RE_PATTERN): - obj._streams = [ - s - for s in obj._streams - for m in [collection.search(s.collection)] - if m - ] + tf = tf & obj._metadata.collection.str.contains( + collection.pattern, case=False, regex=True + ) elif isinstance(collection, str): - obj._streams = [ - s - for s in obj._streams - if s.collection.lower() == collection.lower() - ] + tf = tf & obj._metadata.collection.str.contains( + collection, case=False, regex=False + ) else: raise BTRDBTypeError("collection must be string or compiled regex") # filter by name if name is not None: if isinstance(name, RE_PATTERN): - obj._streams = [ - s for s in obj._streams for m in [name.search(s.name)] if m - ] + tf = tf & obj._metadata.name.str.contains( + name.pattern, case=False, regex=True + ) elif isinstance(name, str): - obj._streams = [ - s for s in obj._streams if s.name.lower() == name.lower() - ] + tf = tf & obj._metadata.name.str.contains(name, case=False, regex=False) else: raise BTRDBTypeError("name must be string or compiled regex") # filter by unit if unit is not None: if isinstance(unit, RE_PATTERN): - obj._streams = [ - s - for s in obj._streams - for m in [unit.search(s.tags()["unit"])] - if m - ] + tf = tf & obj._metadata.unit.str.contains( + unit.pattern, case=False, regex=True + ) elif isinstance(unit, str): - obj._streams = [ - s - for s in obj._streams - if s.tags().get("unit", "").lower() == unit.lower() - ] + tf = tf & obj._metadata.name.str.contains(unit, case=False, regex=False) else: raise BTRDBTypeError("unit must be string or compiled regex") # filter by tags if tags: - # filters if the subset of the tags matches the given tags - obj._streams = [s for s in obj._streams if tags.items() <= s.tags().items()] + tf = tf & obj._metadata.loc[ + :, obj._metadata.columns.isin(tags.keys()) + ].apply( + lambda x: x.str.contains(tags[x.name], case=False, regex=False) + ).all( + axis=1 + ) + obj._metadata = obj._metadata[tf] # filter by annotations if annotations: + _annotations = pd.json_normalize(obj._metadata["annotations"]) + if not _annotations.columns.isin(annotations.keys()).any(): + raise BTRDBValueError("annotations key not found") + obj._metadata = obj._metadata.join( + _annotations, rsuffix="_annotations" + ).drop(columns=["annotations"]) + + _columns = list(annotations.keys()) + list( + map(lambda s: "".join([s, "_annotations"]), annotations.keys()) + ) # filters if the subset of the annotations matches the given annotations - obj._streams = [ - s - for s in obj._streams - if annotations.items() <= s.annotations()[0].items() - ] - + tf = tf & obj._metadata.loc[:, obj._metadata.columns.isin(_columns)].apply( + lambda x: x.str.contains(annotations[x.name], case=False, regex=False) + ).all(axis=1) + obj._metadata = obj._metadata[tf] + obj._streams = obj._metadata["stream"].to_list() return obj def clone(self): diff --git a/requirements.txt b/requirements.txt index a921486..b5d0768 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,6 +5,7 @@ # pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml # # This file was modified to remove version pins. +dask certifi # via btrdb (pyproject.toml) grpcio