Skip to content

Commit

Permalink
add ttl option to DriftClient.walk method
Browse files Browse the repository at this point in the history
  • Loading branch information
atimin committed Dec 14, 2023
1 parent b4525d7 commit c924943
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 4 deletions.
6 changes: 4 additions & 2 deletions pkg/drift_client/drift_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ def walk(
topic: str,
start: Union[float, datetime, str],
stop: Union[float, datetime, str],
**kwargs,
) -> Iterator[DriftDataPackage]:
"""Walks through history data for selected topic
Expand All @@ -227,7 +228,8 @@ def walk(
Format: ISO string, datetime or float timestamp
stop: End of request timeframe,
Format: ISO string, datetime or float timestamp
KwArgs:
ttl: Time to live for the query only for ReductStore
Returns:
Iterator with DriftDataPackage
Raises:
Expand All @@ -247,7 +249,7 @@ def walk(
else:
start = _convert_type(start)
stop = _convert_type(stop)
for package in self._blob_storage.walk(topic, start, stop):
for package in self._blob_storage.walk(topic, start, stop, **kwargs):
yield DriftDataPackage(package)

def subscribe_data(self, topic: str, handler: Callable[[DriftDataPackage], None]):
Expand Down
7 changes: 5 additions & 2 deletions pkg/drift_client/reduct_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,23 @@ def fetch_data(self, path: str) -> Optional[bytes]:
except ReductError as err:
raise DriftClientError(f"Could not read item at {path}") from err

def walk(self, entry: str, start: int, stop: int) -> Iterator[bytes]:
def walk(self, entry: str, start: int, stop: int, **kwargs) -> Iterator[bytes]:
"""
Walk through the records of an entry between start and stop.
Args:
entry: entry name
start: start timestamp UNIX in seconds
stop: stop timestamp UNIX in seconds
Keyword Args:
ttl: time to live for the query
Raises:
DriftClientError: if failed to fetch data
"""

bucket: Bucket = self._run(self._client.get_bucket(self._bucket))

ait = bucket.query(entry, start * 1000_000, stop * 1000_000, ttl=60)
ttl = kwargs.get("ttl", 60)
ait = bucket.query(entry, start * 1000_000, stop * 1000_000, ttl=ttl)

async def get_next():
try:
Expand Down

0 comments on commit c924943

Please sign in to comment.