Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

xautoclaim #1529

Merged
merged 5 commits into from
Aug 5, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ def parse_xclaim(response, **options):
return parse_stream_list(response)


def parse_xautoclaim(response, **options):
if options.get('parse_justid', False):
return response[1]
return parse_stream_list(response[1])


def parse_xinfo_stream(response):
data = pairs_to_dict(response, decode_keys=True)
first = data['first-entry']
Expand Down Expand Up @@ -684,6 +690,7 @@ class Redis:
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'XCLAIM': parse_xclaim,
'XAUTOCLAIM': parse_xautoclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
'XGROUP DESTROY': bool,
Expand Down Expand Up @@ -2578,6 +2585,46 @@ def xadd(self, name, fields, id='*', maxlen=None, approximate=True,
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)

def xautoclaim(self, name, groupname, consumername, min_idle_time,
start_id=0, count=None, justid=False):
"""
Transfers ownership of pending stream entries that match the specified
criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM,
but provides a more straightforward way to deal with message delivery
failures via SCAN-like semantics.
name: name of the stream.
groupname: name of the consumer group.
consumername: name of a consumer that claims the message.
min_idle_time: filter messages that were idle less than this amount of
milliseconds.
start_id: filter messages with equal or greater ID.
count: optional integer, upper limit of the number of entries that the
command attempts to claim. Set to 100 by default.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message
"""
try:
if int(min_idle_time) < 0:
raise DataError("XAUTOCLAIM min_idle_time must be a non"
"negative integer")
except TypeError:
pass

kwargs = {}
pieces = [name, groupname, consumername, min_idle_time, start_id]

try:
if int(count) < 0:
raise DataError("XPENDING count must be a integer >= 0")
pieces.extend([b'COUNT', count])
except TypeError:
pass
if justid:
pieces.append(b'JUSTID')
kwargs['parse_justid'] = True

return self.execute_command('XAUTOCLAIM', *pieces, **kwargs)

def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):
Expand Down
48 changes: 47 additions & 1 deletion tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2300,13 +2300,59 @@ def test_xadd_nomkstream(self, r):
r.xadd(stream, {'some': 'other'}, nomkstream=True)
assert r.xlen(stream) == 3

@skip_if_server_version_lt('6.2.0')
def test_xautoclaim(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'

message_id1 = r.xadd(stream, {'john': 'wick'})
message_id2 = r.xadd(stream, {'johny': 'deff'})
message = get_stream_message(r, stream, message_id1)
r.xgroup_create(stream, group, 0)

# trying to claim a message that isn't already pending doesn't
# do anything
response = r.xautoclaim(stream, group, consumer2, min_idle_time=0)
assert response == []

# read the group as consumer1 to initially claim the messages
r.xreadgroup(group, consumer1, streams={stream: '>'})

# claim one message as consumer2
response = r.xautoclaim(stream, group, consumer2,
min_idle_time=0, count=1)
assert response == [message]

# reclaim the messages as consumer1, but use the justid argument
# which only returns message ids
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
start_id=0, justid=True) == \
[message_id1, message_id2]
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
start_id=message_id2, justid=True) == \
[message_id2]

@skip_if_server_version_lt('6.2.0')
def test_xautoclaim_negative(self, r):
stream = 'stream'
group = 'group'
consumer = 'consumer'
with pytest.raises(redis.DataError):
r.xautoclaim(stream, group, consumer, min_idle_time=-1)
with pytest.raises(ValueError):
r.xautoclaim(stream, group, consumer, min_idle_time="wrong")
with pytest.raises(redis.DataError):
r.xautoclaim(stream, group, consumer, min_idle_time=0,
count=-1)

@skip_if_server_version_lt('5.0.0')
def test_xclaim(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'

message_id = r.xadd(stream, {'john': 'wick'})
message = get_stream_message(r, stream, message_id)
r.xgroup_create(stream, group, 0)
Expand Down