Skip to content

Commit

Permalink
xautoclaim (#1529)
Browse files Browse the repository at this point in the history
  • Loading branch information
AvitalFineRedis authored Aug 5, 2021
1 parent 9c60670 commit ba30d02
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 1 deletion.
47 changes: 47 additions & 0 deletions redis/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,12 @@ def parse_xclaim(response, **options):
return parse_stream_list(response)


def parse_xautoclaim(response, **options):
if options.get('parse_justid', False):
return response[1]
return parse_stream_list(response[1])


def parse_xinfo_stream(response):
data = pairs_to_dict(response, decode_keys=True)
first = data['first-entry']
Expand Down Expand Up @@ -684,6 +690,7 @@ class Redis:
'SSCAN': parse_scan,
'TIME': lambda x: (int(x[0]), int(x[1])),
'XCLAIM': parse_xclaim,
'XAUTOCLAIM': parse_xautoclaim,
'XGROUP CREATE': bool_ok,
'XGROUP DELCONSUMER': int,
'XGROUP DESTROY': bool,
Expand Down Expand Up @@ -2601,6 +2608,46 @@ def xadd(self, name, fields, id='*', maxlen=None, approximate=True,
pieces.extend(pair)
return self.execute_command('XADD', name, *pieces)

def xautoclaim(self, name, groupname, consumername, min_idle_time,
start_id=0, count=None, justid=False):
"""
Transfers ownership of pending stream entries that match the specified
criteria. Conceptually, equivalent to calling XPENDING and then XCLAIM,
but provides a more straightforward way to deal with message delivery
failures via SCAN-like semantics.
name: name of the stream.
groupname: name of the consumer group.
consumername: name of a consumer that claims the message.
min_idle_time: filter messages that were idle less than this amount of
milliseconds.
start_id: filter messages with equal or greater ID.
count: optional integer, upper limit of the number of entries that the
command attempts to claim. Set to 100 by default.
justid: optional boolean, false by default. Return just an array of IDs
of messages successfully claimed, without returning the actual message
"""
try:
if int(min_idle_time) < 0:
raise DataError("XAUTOCLAIM min_idle_time must be a non"
"negative integer")
except TypeError:
pass

kwargs = {}
pieces = [name, groupname, consumername, min_idle_time, start_id]

try:
if int(count) < 0:
raise DataError("XPENDING count must be a integer >= 0")
pieces.extend([b'COUNT', count])
except TypeError:
pass
if justid:
pieces.append(b'JUSTID')
kwargs['parse_justid'] = True

return self.execute_command('XAUTOCLAIM', *pieces, **kwargs)

def xclaim(self, name, groupname, consumername, min_idle_time, message_ids,
idle=None, time=None, retrycount=None, force=False,
justid=False):
Expand Down
48 changes: 47 additions & 1 deletion tests/test_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -2374,13 +2374,59 @@ def test_xadd_nomkstream(self, r):
r.xadd(stream, {'some': 'other'}, nomkstream=True)
assert r.xlen(stream) == 3

@skip_if_server_version_lt('6.2.0')
def test_xautoclaim(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'

message_id1 = r.xadd(stream, {'john': 'wick'})
message_id2 = r.xadd(stream, {'johny': 'deff'})
message = get_stream_message(r, stream, message_id1)
r.xgroup_create(stream, group, 0)

# trying to claim a message that isn't already pending doesn't
# do anything
response = r.xautoclaim(stream, group, consumer2, min_idle_time=0)
assert response == []

# read the group as consumer1 to initially claim the messages
r.xreadgroup(group, consumer1, streams={stream: '>'})

# claim one message as consumer2
response = r.xautoclaim(stream, group, consumer2,
min_idle_time=0, count=1)
assert response == [message]

# reclaim the messages as consumer1, but use the justid argument
# which only returns message ids
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
start_id=0, justid=True) == \
[message_id1, message_id2]
assert r.xautoclaim(stream, group, consumer1, min_idle_time=0,
start_id=message_id2, justid=True) == \
[message_id2]

@skip_if_server_version_lt('6.2.0')
def test_xautoclaim_negative(self, r):
stream = 'stream'
group = 'group'
consumer = 'consumer'
with pytest.raises(redis.DataError):
r.xautoclaim(stream, group, consumer, min_idle_time=-1)
with pytest.raises(ValueError):
r.xautoclaim(stream, group, consumer, min_idle_time="wrong")
with pytest.raises(redis.DataError):
r.xautoclaim(stream, group, consumer, min_idle_time=0,
count=-1)

@skip_if_server_version_lt('5.0.0')
def test_xclaim(self, r):
stream = 'stream'
group = 'group'
consumer1 = 'consumer1'
consumer2 = 'consumer2'

message_id = r.xadd(stream, {'john': 'wick'})
message = get_stream_message(r, stream, message_id)
r.xgroup_create(stream, group, 0)
Expand Down

0 comments on commit ba30d02

Please sign in to comment.