Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Memory leak in consumers framework API? #546

Closed
albnnc opened this issue Jun 21, 2023 · 9 comments
Closed

Memory leak in consumers framework API? #546

albnnc opened this issue Jun 21, 2023 · 9 comments

Comments

@albnnc
Copy link

albnnc commented Jun 21, 2023

My team was using js.pullSubscribe, but we found the new API kinda comfortable to work with and decided to update our codebase. However, we were unable to migrate the codebase without troubles.

Consider the following code snippet:

const natsUrl = Deno.env.get("NATS_URL") ?? "";
const nc = await connect({ servers: natsUrl });
const js = nc.jetstream();
const consumer = await js.consumers.get("ENTITY", "ENTITY_CONSUMER");
while (!nc.isClosed()) {
  const msg = await consumer.next();
  msg?.nak();
  // Remove this line to see the leak occurring faster.
  await delay(1_000);
}
Consumer Info

image

Memory will leak dramatically making an app to fail with V8 error.

Is there any principal mistake in the code above? Thanks in advance.

@aricart
Copy link
Member

aricart commented Jun 21, 2023

I can verify this is happening/

@aricart
Copy link
Member

aricart commented Jun 22, 2023

@albnnc to unblock you please use the consume API like this:

const consumer = await js.consumers.get("messages", "myconsumer");
const iter = await consumer.consume({max_messages: 1});
for await(const m of iter) {
  console.log(m?.info.redeliveryCount);
  m?.nak();
}

@aricart
Copy link
Member

aricart commented Jun 22, 2023

The above will do the same, but doesn't trigger the leak - at any one point the consumer will only have one message which is effectively what you are doing with the next() on the loop

@aricart
Copy link
Member

aricart commented Jun 22, 2023

@albnnc I have figured out where the issue is, while I make a release, the above suggestion will unblock you. Thank you for finding this.

@albnnc
Copy link
Author

albnnc commented Jun 22, 2023

@aricart, thanks for the response!

What I need actually is pulling a bunch of messages, limited to a certain maximum count, and process them while blocking the receiving of messages. With consumer.next() I would be able to simply collect messages and, when needed, process them and not to call the next function. With consumer.consume({ ... }), I think, this is not possible. I think that the following would do the trick:

const consumer = await js.consumers.get("ENTITY", "ENTITY_CONSUMER");
const batchSizeMax = 1_000;
let batch: JsMsg[] = [];
while (!nc.isClosed()) {
  const iter = await consumer.fetch({ max_messages: batchSizeMax });
  for await (const msg of iter) {
    batch.push(msg);
    if (!msg.info.pending) {
      break;
    }
  }
  if (batch.length) {
    console.log(`Processing batch of size ${batch.length}`);
    console.log("redeliveryCount", batch[0].info.redeliveryCount);
    batch.forEach((v) => v.nak());
    batch = [];
  } else {
    await delay(1_000);
  }
}

However, this code leaks too (but much slower actually). Will it be fixed in the upcoming PR?

@aricart
Copy link
Member

aricart commented Jun 22, 2023

@albnnc the leak in fetch would also be fixed by the same PR - but I will make sure (running the tests right now)

If I understand correctly what you want is to retrieve N number of messages that you can assign to workers and can process concurrently. If that is the case, then fetch is what you want. See https://github.com/nats-io/nats.deno/blob/main/jetstream.md#fetching-batch-of-messages

@aricart
Copy link
Member

aricart commented Jun 22, 2023

@albnnc yes the PR fixes fetch as well.

@albnnc
Copy link
Author

albnnc commented Jun 22, 2023

If I understand correctly what you want is to retrieve N number of messages that you can assign to workers and can process concurrently. If that is the case, then fetch is what you want. See https://github.com/nats-io/nats.deno/blob/main/jetstream.md#fetching-batch-of-messages

Yeah, I saw that part of docs. The notable part is that I need to start handling messages immediately, even when there are less than max_messages messages. However, the example from docs will wait for expires timer. That's why I'm checking for msg.info.pending value and breaking the fetch loop if there is no next message.

@albnnc the leak in fetch would also be fixed by the same PR - but I will make sure (running the tests right now)

I can confirm this too. Will wait for release, thanks!

@aricart
Copy link
Member

aricart commented Jun 22, 2023

@albnnc I have release new versions of all the clients fixing this issue, if you notice anything else please holler.
Thank you for finding this.

@aricart aricart closed this as completed Jun 22, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants