Skip to content

Commit

Permalink
Allow filtering list items on read
Browse files Browse the repository at this point in the history
  • Loading branch information
ruksi committed Jan 30, 2025
1 parent e4443b3 commit 244e5ab
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 14 deletions.
45 changes: 32 additions & 13 deletions minique/utils/redis_list.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from typing import Iterable, Optional
from collections import deque
from typing import Iterable, Optional, Callable, Deque

from redis import Redis

Expand All @@ -8,6 +9,7 @@ def read_list(
key: str,
*,
chunk_size: int = 4096,
filter_fn: Optional[Callable[[bytes], bool]] = None,
last_n: Optional[int] = None,
) -> Iterable[bytes]:
"""
Expand All @@ -18,7 +20,8 @@ def read_list(
:param redis_conn: Redis connection
:param key: Key
:param chunk_size: How many lines to read per request.
:param last_n: Attempt to only read the last N lines.
:param filter_fn: Only include lines that pass this filter
:param last_n: How many last lines to return, filtering is applied before limiting.
:return:
"""
# LLEN returns zero for a non-existent key,
Expand All @@ -31,17 +34,33 @@ def read_list(
if chunk_size <= 0:
chunk_size = 4096

if last_n and last_n > 0:
offset = max(0, list_len - last_n)
else:
offset = 0
# store the results in a deque, so we can efficiently
# append to the left, and then yield in the right order
# without reversing a list at the end
results: Deque[bytes] = deque()

while offset < list_len:
# Regarding that - 1 there, see this from https://redis.io/commands/lrange:
# > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10
# > will return 11 elements, that is, the rightmost item is included.
chunk = redis_conn.lrange(key, offset, offset + chunk_size - 1) or []
# Regarding that - 1 there, see this from https://redis.io/commands/lrange:
# > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10
# > will return 11 elements, that is, the rightmost item is included.
end_index = list_len - 1
remaining_needed = last_n if last_n and last_n > 0 else list_len

# read until we have enough items, or we run out of the list
while end_index >= 0 and remaining_needed > 0:
start_index = max(0, end_index - chunk_size + 1)

chunk = redis_conn.lrange(key, start_index, end_index)
if not chunk:
break
yield from chunk
offset += chunk_size

for item in reversed(chunk):
if not filter_fn or filter_fn(item):
results.appendleft(item)
remaining_needed -= 1
if remaining_needed == 0:
break

# move the reading window further back
end_index = start_index - 1

yield from results
43 changes: 42 additions & 1 deletion minique_tests/test_redis_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,49 @@
from minique.utils.redis_list import read_list


def test_redis_list(redis: Redis, random_queue_name: str):
def test_read_list(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(500)]
redis.rpush(random_queue_name, *data)
assert list(read_list(redis, random_queue_name, chunk_size=7)) == data
redis.delete(random_queue_name)


def test_read_list_last_n(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(6)]
redis.rpush(random_queue_name, *data)

last_two = read_list(redis, random_queue_name, last_n=2)
assert list(last_two) == [b"4", b"5"]

last_four = read_list(redis, random_queue_name, last_n=4)
assert list(last_four) == [b"2", b"3", b"4", b"5"]

last_everything = read_list(redis, random_queue_name, last_n=999_999)
assert list(last_everything) == [b"0", b"1", b"2", b"3", b"4", b"5"]

redis.delete(random_queue_name)


def test_read_list_filter(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(101)]
redis.rpush(random_queue_name, *data)

last_four_without_nines = read_list(
redis,
random_queue_name,
chunk_size=2,
last_n=4,
filter_fn=lambda line: b"9" not in line,
)
assert list(last_four_without_nines) == [b"86", b"87", b"88", b"100"]

last_four_starting_with_one = read_list(
redis,
random_queue_name,
chunk_size=5,
last_n=4,
filter_fn=lambda line: line.startswith(b"1"),
)
assert list(last_four_starting_with_one) == [b"17", b"18", b"19", b"100"]

redis.delete(random_queue_name)

0 comments on commit 244e5ab

Please sign in to comment.