Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow filtering Redis list items on read #39

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions minique/utils/redis_list.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Iterable, Optional
from typing import Iterable, Optional, Callable

from redis import Redis

Expand All @@ -8,6 +8,7 @@ def read_list(
key: str,
*,
chunk_size: int = 4096,
filter_fn: Optional[Callable[[bytes], bool]] = None,
last_n: Optional[int] = None,
) -> Iterable[bytes]:
"""
Expand All @@ -18,7 +19,8 @@ def read_list(
:param redis_conn: Redis connection
:param key: Key
:param chunk_size: How many lines to read per request.
:param last_n: Attempt to only read the last N lines.
:param filter_fn: Only include lines that pass this filter
:param last_n: How many last lines to return, filtering is applied before limiting.
:return:
"""
# LLEN returns zero for a non-existent key,
Expand All @@ -31,17 +33,30 @@ def read_list(
if chunk_size <= 0:
chunk_size = 4096

if last_n and last_n > 0:
offset = max(0, list_len - last_n)
else:
offset = 0
results = []

while offset < list_len:
# Regarding that - 1 there, see this from https://redis.io/commands/lrange:
# > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10
# > will return 11 elements, that is, the rightmost item is included.
chunk = redis_conn.lrange(key, offset, offset + chunk_size - 1) or []
# Regarding that - 1 there, see this from https://redis.io/commands/lrange:
# > Note that if you have a list of numbers from 0 to 100, LRANGE list 0 10
# > will return 11 elements, that is, the rightmost item is included.
end_index = list_len - 1
remaining_needed = last_n if last_n and last_n > 0 else list_len

# read until we have enough items, or we run out of the list
while end_index >= 0 and remaining_needed > 0:
start_index = max(0, end_index - chunk_size + 1)

chunk = redis_conn.lrange(key, start_index, end_index)
if not chunk:
break
yield from chunk
offset += chunk_size

for item in reversed(chunk):
if not filter_fn or filter_fn(item):
results.append(item)
remaining_needed -= 1
if remaining_needed == 0:
break

# move the reading window further back
end_index = start_index - 1

yield from reversed(results)
43 changes: 42 additions & 1 deletion minique_tests/test_redis_list.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,49 @@
from minique.utils.redis_list import read_list


def test_redis_list(redis: Redis, random_queue_name: str):
def test_read_list(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(500)]
redis.rpush(random_queue_name, *data)
assert list(read_list(redis, random_queue_name, chunk_size=7)) == data
redis.delete(random_queue_name)


def test_read_list_last_n(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(6)]
redis.rpush(random_queue_name, *data)

last_two = read_list(redis, random_queue_name, last_n=2)
assert list(last_two) == [b"4", b"5"]

last_four = read_list(redis, random_queue_name, last_n=4)
assert list(last_four) == [b"2", b"3", b"4", b"5"]

last_everything = read_list(redis, random_queue_name, last_n=999_999)
assert list(last_everything) == [b"0", b"1", b"2", b"3", b"4", b"5"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

these are all spanning just one chunk, does it behave correctly if it requests multiple chunks? I have a hunch that it's not properly getting the last_n if it has to span multiple chunks, as while it iterates each chunk in reverse order, chunks iterate in forward order, so the last N of the first chunk are not the last N of the entire list

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can test!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

yes

Copy link
Member Author

@ruksi ruksi Jan 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lrange            = [3, 4]  [1, 2] (chunks read from the end of the list)
reversed(chunk)   = [4, 3]  [2, 1]
result            = [4, 3, 2, 1]   (assuming no filtering, append as long as last_n)
reversed(results) = [1, 2, 3, 4]

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, the start index calculation means it's starting from the end so the chunks are iterated back to front


redis.delete(random_queue_name)


def test_read_list_filter(redis: Redis, random_queue_name: str):
data = [str(x).encode() for x in range(101)]
redis.rpush(random_queue_name, *data)

last_four_without_nines = read_list(
redis,
random_queue_name,
chunk_size=2,
last_n=4,
filter_fn=lambda line: b"9" not in line,
)
assert list(last_four_without_nines) == [b"86", b"87", b"88", b"100"]

last_four_starting_with_one = read_list(
redis,
random_queue_name,
chunk_size=5,
last_n=4,
filter_fn=lambda line: line.startswith(b"1"),
)
assert list(last_four_starting_with_one) == [b"17", b"18", b"19", b"100"]

redis.delete(random_queue_name)