Skip to content

Commit 6fb207f

Browse files
Wh1ispervladvildanov
authored andcommitted
Improve parsing of XINFO STREAM response (#3282)
Make the parsing of XINFO STREAM response more resilient, by handling the case when first and last entries are None after XTRIM. Improve the parsing of consumers related info.
1 parent 6f04bde commit 6fb207f

File tree

3 files changed

+48
-4
lines changed

3 files changed

+48
-4
lines changed

redis/_parsers/helpers.py

+8-3
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,22 @@ def parse_xinfo_stream(response, **options):
275275
data = {str_if_bytes(k): v for k, v in response.items()}
276276
if not options.get("full", False):
277277
first = data.get("first-entry")
278-
if first is not None:
278+
if first is not None and first[0] is not None:
279279
data["first-entry"] = (first[0], pairs_to_dict(first[1]))
280280
last = data["last-entry"]
281-
if last is not None:
281+
if last is not None and last[0] is not None:
282282
data["last-entry"] = (last[0], pairs_to_dict(last[1]))
283283
else:
284284
data["entries"] = {_id: pairs_to_dict(entry) for _id, entry in data["entries"]}
285-
if isinstance(data["groups"][0], list):
285+
if len(data["groups"]) > 0 and isinstance(data["groups"][0], list):
286286
data["groups"] = [
287287
pairs_to_dict(group, decode_keys=True) for group in data["groups"]
288288
]
289+
for g in data["groups"]:
290+
if g["consumers"] and g["consumers"][0] is not None:
291+
g["consumers"] = [
292+
pairs_to_dict(c, decode_keys=True) for c in g["consumers"]
293+
]
289294
else:
290295
data["groups"] = [
291296
{str_if_bytes(k): v for k, v in group.items()}

tests/test_asyncio/test_commands.py

+25
Original file line numberDiff line numberDiff line change
@@ -2919,6 +2919,31 @@ async def test_xinfo_stream(self, r: redis.Redis):
29192919
assert info["first-entry"] == await get_stream_message(r, stream, m1)
29202920
assert info["last-entry"] == await get_stream_message(r, stream, m2)
29212921

2922+
await r.xtrim(stream, 0)
2923+
info = await r.xinfo_stream(stream)
2924+
assert info["length"] == 0
2925+
assert info["first-entry"] is None
2926+
assert info["last-entry"] is None
2927+
2928+
@skip_if_server_version_lt("6.0.0")
2929+
async def test_xinfo_stream_full(self, r: redis.Redis):
2930+
stream = "stream"
2931+
group = "group"
2932+
2933+
await r.xadd(stream, {"foo": "bar"})
2934+
info = await r.xinfo_stream(stream, full=True)
2935+
assert info["length"] == 1
2936+
assert len(info["groups"]) == 0
2937+
2938+
await r.xgroup_create(stream, group, 0)
2939+
info = await r.xinfo_stream(stream, full=True)
2940+
assert info["length"] == 1
2941+
2942+
await r.xreadgroup(group, "consumer", streams={stream: ">"})
2943+
info = await r.xinfo_stream(stream, full=True)
2944+
consumer = info["groups"][0]["consumers"][0]
2945+
assert isinstance(consumer, dict)
2946+
29222947
@skip_if_server_version_lt("5.0.0")
29232948
async def test_xlen(self, r: redis.Redis):
29242949
stream = "stream"

tests/test_commands.py

+15-1
Original file line numberDiff line numberDiff line change
@@ -4452,14 +4452,23 @@ def test_xinfo_stream(self, r):
44524452
assert info["entries-added"] == 2
44534453
assert info["recorded-first-entry-id"] == m1
44544454

4455+
r.xtrim(stream, 0)
4456+
info = r.xinfo_stream(stream)
4457+
assert info["length"] == 0
4458+
assert info["first-entry"] is None
4459+
assert info["last-entry"] is None
4460+
44554461
@skip_if_server_version_lt("6.0.0")
44564462
def test_xinfo_stream_full(self, r):
44574463
stream = "stream"
44584464
group = "group"
44594465
m1 = r.xadd(stream, {"foo": "bar"})
4460-
r.xgroup_create(stream, group, 0)
44614466
info = r.xinfo_stream(stream, full=True)
4467+
assert info["length"] == 1
4468+
assert len(info["groups"]) == 0
44624469

4470+
r.xgroup_create(stream, group, 0)
4471+
info = r.xinfo_stream(stream, full=True)
44634472
assert info["length"] == 1
44644473
assert_resp_response_in(
44654474
r,
@@ -4469,6 +4478,11 @@ def test_xinfo_stream_full(self, r):
44694478
)
44704479
assert len(info["groups"]) == 1
44714480

4481+
r.xreadgroup(group, "consumer", streams={stream: ">"})
4482+
info = r.xinfo_stream(stream, full=True)
4483+
consumer = info["groups"][0]["consumers"][0]
4484+
assert isinstance(consumer, dict)
4485+
44724486
@skip_if_server_version_lt("5.0.0")
44734487
def test_xlen(self, r):
44744488
stream = "stream"

0 commit comments

Comments
 (0)