Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Produce NPE when stream pending message is missing #3093

Closed
VictoryWangCN opened this issue Aug 2, 2022 · 2 comments · Fixed by #3094
Closed

Produce NPE when stream pending message is missing #3093

VictoryWangCN opened this issue Aug 2, 2022 · 2 comments · Fixed by #3094
Labels

Comments

@VictoryWangCN
Copy link
Contributor

Steps to reproduce:

127.0.0.1:6379> XADD test-1 MAXLEN 2 * test-field-1 test-message-1
"1659407518762-0"
127.0.0.1:6379> XADD test-1 MAXLEN 2 * test-field-2 test-message-2
"1659407526450-0"
127.0.0.1:6379> XGROUP CREATE test-1 test-group 0
OK
127.0.0.1:6379> XREADGROUP GROUP test-group consumer COUNT 1 STREAMS test-1 0
1) 1) "test-1"
   2) (empty array)
127.0.0.1:6379> XREADGROUP GROUP test-group consumer COUNT 1 STREAMS test-1 >
1) 1) "test-1"
   2) 1) 1) "1659407518762-0"
         2) 1) "test-field-1"
            2) "test-message-1"
127.0.0.1:6379> XADD test-1 MAXLEN 2 * test-field-3 test-message-3
"1659407548451-0"
127.0.0.1:6379> XREADGROUP GROUP test-group consumer COUNT 1 STREAMS test-1 0
1) 1) "test-1"
   2) 1) 1) "1659407518762-0"
         2) (nil)

use jedis, will produce NPE

jedis.xreadGroup(
      "test-group", "consumer", xReadGroupParams()
          .block((int) Duration.ofSeconds(1).toMillis())
          .count(32),
      ImmutableMap.of("test-1", new StreamEntryID(0, 0))
  );

NPE at

public static final Builder<List<StreamEntry>> STREAM_ENTRY_LIST = new Builder<List<StreamEntry>>() {
    @Override
    @SuppressWarnings("unchecked")
    public List<StreamEntry> build(Object data) {
      if (null == data) {
        return null;
      }
      List<ArrayList<Object>> objectList = (List<ArrayList<Object>>) data;

      List<StreamEntry> responses = new ArrayList<>(objectList.size() / 2);
      if (objectList.isEmpty()) {
        return responses;
      }

      for (ArrayList<Object> res : objectList) {
        if (res == null) {
          responses.add(null);
          continue;
        }
        String entryIdString = SafeEncoder.encode((byte[]) res.get(0));
        StreamEntryID entryID = new StreamEntryID(entryIdString);
        List<byte[]> hash = (List<byte[]>) res.get(1); // hash is null

        Iterator<byte[]> hashIterator = hash.iterator();
        Map<String, String> map = new HashMap<>(hash.size() / 2);
        while (hashIterator.hasNext()) {
          map.put(SafeEncoder.encode(hashIterator.next()), SafeEncoder.encode(hashIterator.next()));
        }
        responses.add(new StreamEntry(entryID, map));
      }

      return responses;
    }

    @Override
    public String toString() {
      return "List<StreamEntry>";
    }
  };

Expected behavior

xreadGroup should return StreamEntry with empty fields map.
then we can use StreamEntryID, mark ack for pending message (skip it)

Actual behavior

java.lang.NullPointerException: Cannot invoke "java.util.List.iterator()" because "hash" is null
at redis.clients.jedis.BuilderFactory$36.build(BuilderFactory.java:802)
at redis.clients.jedis.BuilderFactory$36.build(BuilderFactory.java:779)
at redis.clients.jedis.BuilderFactory$39.build(BuilderFactory.java:872)
at redis.clients.jedis.BuilderFactory$39.build(BuilderFactory.java:860)
at redis.clients.jedis.Jedis.xreadGroup(Jedis.java:4592)

Jedis version:

3.7.1

Java version:

17.0.3

@itamarhaber
Copy link
Member

Added clarification about design and expected behavior via redis/redis-doc#2067

@sazzad16
Copy link
Collaborator

sazzad16 commented Aug 4, 2022

Thanks a lot @itamarhaber 😃

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants