Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
95 changes: 59 additions & 36 deletions btrdb/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
from collections.abc import Sequence
from copy import deepcopy

import pandas as pd
import pyarrow as pa
from dask import compute, delayed

from btrdb.exceptions import (
BTrDBError,
Expand Down Expand Up @@ -58,6 +60,7 @@

_arrow_not_impl_str = "The BTrDB server you are using does not support {}."


##########################################################################
## Stream Classes
##########################################################################
Expand Down Expand Up @@ -1433,6 +1436,16 @@ def __repr__(self):
##########################################################################


@delayed
def get_metadata(stream):
columns = ["collection", "tags", "annotations", "stream", "uuid"]
stream.refresh_metadata()
metadata = {
c: (getattr(stream, f"_{c}") if c != "stream" else stream) for c in columns
}
return metadata


class StreamSetBase(Sequence):
"""
A lighweight wrapper around a list of stream objects
Expand All @@ -1455,6 +1468,13 @@ def __init__(self, streams):
self.width = None
self.depth = 0

# create a DataFrame to store the metadata for filtering
_metadata = compute([get_metadata(s) for s in self._streams])[0]
_metadata = pd.DataFrame(_metadata)
self._metadata = _metadata.join(pd.json_normalize(_metadata["tags"])).drop(
columns=["tags", "annotations"]
)

@property
def allow_window(self):
return not bool(self.pointwidth or (self.width and self.depth == 0))
Expand Down Expand Up @@ -1712,69 +1732,72 @@ def filter(
)
)

tf = ~obj._metadata.uuid.isna() # shouldn't contain any None

# filter by collection
if collection is not None:
if isinstance(collection, RE_PATTERN):
obj._streams = [
s
for s in obj._streams
for m in [collection.search(s.collection)]
if m
]
tf = tf & obj._metadata.collection.str.contains(
collection.pattern, case=False, regex=True
)
elif isinstance(collection, str):
obj._streams = [
s
for s in obj._streams
if s.collection.lower() == collection.lower()
]
tf = tf & obj._metadata.collection.str.contains(
collection, case=False, regex=False
)
else:
raise BTRDBTypeError("collection must be string or compiled regex")

# filter by name
if name is not None:
if isinstance(name, RE_PATTERN):
obj._streams = [
s for s in obj._streams for m in [name.search(s.name)] if m
]
tf = tf & obj._metadata.name.str.contains(
name.pattern, case=False, regex=True
)
elif isinstance(name, str):
obj._streams = [
s for s in obj._streams if s.name.lower() == name.lower()
]
tf = tf & obj._metadata.name.str.contains(name, case=False, regex=False)
else:
raise BTRDBTypeError("name must be string or compiled regex")

# filter by unit
if unit is not None:
if isinstance(unit, RE_PATTERN):
obj._streams = [
s
for s in obj._streams
for m in [unit.search(s.tags()["unit"])]
if m
]
tf = tf & obj._metadata.unit.str.contains(
unit.pattern, case=False, regex=True
)
elif isinstance(unit, str):
obj._streams = [
s
for s in obj._streams
if s.tags().get("unit", "").lower() == unit.lower()
]
tf = tf & obj._metadata.name.str.contains(unit, case=False, regex=False)
else:
raise BTRDBTypeError("unit must be string or compiled regex")

# filter by tags
if tags:
# filters if the subset of the tags matches the given tags
obj._streams = [s for s in obj._streams if tags.items() <= s.tags().items()]
tf = tf & obj._metadata.loc[
:, obj._metadata.columns.isin(tags.keys())
].apply(
lambda x: x.str.contains(tags[x.name], case=False, regex=False)
).all(
axis=1
)
obj._metadata = obj._metadata[tf]

# filter by annotations
if annotations:
_annotations = pd.json_normalize(obj._metadata["annotations"])
if not _annotations.columns.isin(annotations.keys()).any():
raise BTRDBValueError("annotations key not found")
obj._metadata = obj._metadata.join(
_annotations, rsuffix="_annotations"
).drop(columns=["annotations"])

_columns = list(annotations.keys()) + list(
map(lambda s: "".join([s, "_annotations"]), annotations.keys())
)
# filters if the subset of the annotations matches the given annotations
obj._streams = [
s
for s in obj._streams
if annotations.items() <= s.annotations()[0].items()
]

tf = tf & obj._metadata.loc[:, obj._metadata.columns.isin(_columns)].apply(
lambda x: x.str.contains(annotations[x.name], case=False, regex=False)
).all(axis=1)
obj._metadata = obj._metadata[tf]
obj._streams = obj._metadata["stream"].to_list()
return obj

def clone(self):
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
# pip-compile --output-file=requirements.txt --resolver=backtracking pyproject.toml
#
# This file was modified to remove version pins.
dask
certifi
# via btrdb (pyproject.toml)
grpcio
Expand Down