Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageTimeToLiveChecker clogs the log stream with commands #11762

Closed
korthout opened this issue Feb 20, 2023 · 10 comments · Fixed by #11856 or #11948
Closed

MessageTimeToLiveChecker clogs the log stream with commands #11762

korthout opened this issue Feb 20, 2023 · 10 comments · Fixed by #11856 or #11948
Assignees
Labels
area/performance Marks an issue as performance related component/engine kind/bug Categorizes an issue or PR as a bug scope/broker Marks an issue or PR to appear in the broker section of the changelog severity/high Marks a bug as having a noticeable impact on the user with no known workaround version:8.1.9 Marks an issue as being completely or in parts released in 8.1.9 version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0

Comments

@korthout
Copy link
Member

Describe the bug

The MessageTimeToLiveChecker appends Message:Expire commands to the log for each message that has surpassed its TTL. If there are many messages in the state with an expired TTL, it will write many commands to the log in a single batch. This means the engine doesn't have time to process anything else until it processed all the commands in that batch. This can lead to latency spikes, see #11591.

To Reproduce

  • send a huge number of messages with a TTL of some minutes, continuously
  • once those minutes have passed, notice that publishing more messages suffers from increased latency

Expected behavior

The MessageTimeToLiveChecker should limit the number of message expiration commands it appends in a single batch.

Since there can still be more messages with an expired TTL in the state (and the state is already being iterated), the checker should just continue after appending the batch by starting to append to another batch. Once the checker is out of messages to expire, it can be rescheduled to run after the fixed interval that currently also already exists.

@korthout korthout added kind/bug Categorizes an issue or PR as a bug scope/broker Marks an issue or PR to appear in the broker section of the changelog area/performance Marks an issue as performance related severity/high Marks a bug as having a noticeable impact on the user with no known workaround component/engine labels Feb 20, 2023
@korthout korthout self-assigned this Feb 20, 2023
@ChrisKujawa
Copy link
Member

Maybe we can find a solution which we can also apply to the other cases like job timeouts, times triggers, multi instance activation etc.

Not sure what your idea of solving was but I was thinking whether we could introduce a new command which contains the count or keys as list. Like instead of writting 1k message delete commands we write one with an list of all keys we want to delete.

@korthout
Copy link
Member Author

Thanks @Zelldon those are interesting points.

Maybe we can find a solution which we can also apply to the other cases like job timeouts, times triggers, multi instance activation etc.

Absolutely, we should keep this in mind. I'm not sure how much of this issue will result in changes that are reusable for those concepts, it may be more relevant for #11761 or other issues coming out of #11591

Not sure what your idea of solving was but I was thinking whether we could introduce a new command which contains the count or keys as list. Like instead of writting 1k message delete commands we write one with an list of all keys we want to delete.

We will tackle this idea in a separate issue. It's already tracked in #11591, but no issue has been created for it yet.

@korthout
Copy link
Member Author

Since there can still be more messages with an expired TTL in the state (and the state is already being iterated), the checker should just continue after appending the batch by starting to append to another batch. Once the checker is out of messages to expire, it can be rescheduled to run after the fixed interval that currently also already exists.

@romansmirnov it appears that this is not possible without altering the interface between the Stream Processor and the Engine significantly.

The reason is that the checker is just a scheduled Task. When executed, a task receives a TaskResultBuilder, which can be used to construct the Record Batch by appending commands to the builder. The Stream Processor then takes that record batch and appends it to the log. There is currently no way for the checker to provide a smaller record batch and continue to append to a new record batch.

I suggest we simply limit the number of commands appended to the record batch, i.e. the task result. Note that this means that after the task has completed, the checker is rescheduled in 60 seconds. We can make this interval configurable in a separate issue.

@romansmirnov Is that okay with you?

@deepthidevaki
Copy link
Contributor

I suggest we simply limit the number of commands appended to the record batch, i.e. the task result. Note that this means that after the task has completed, the checker is rescheduled in 60 seconds. We can make this interval configurable in a separate issue.

One thing to consider here is that this should not result in more expired messages being accumulated in the state. The use case we are considering had around 3k messages produced per minute. So 3k messages should be also deleted per minute, otherwise the state will grow unbounded.

@ChrisKujawa
Copy link
Member

@deepthidevaki probably a good test/benchmark to verify after

@romansmirnov
Copy link
Member

romansmirnov commented Feb 21, 2023

I agree with @deepthidevaki, we should make sure that it does not result in more expired messages being accumulated in the state. Only limiting the number of commands to append and making the interval configurable does not prevent this from happening.

I believe there could be different approaches how to achieve the desired behavior, i.e., submitting batches with a limited number of commands and still continue collecting the next one. Some potential approaches:

  1. Slice the batch of commands in ProcessingScheduleService: After appending commands for all expired messages, the ProcessingScheduleService slices the batch of commands into smaller batches, and tries to write them to the log stream one by one. To write them to the log stream, the scheduling service schedules itself for the next small batch to write (i.e., freeing up the actor in between). That way, it would introduce some delay between the batches themselves. Therefore, the MessageTimeToLiveChecker would need to "tell" the scheduling service that it can slice the batch of commands.
  2. MessageTimeToLiveChecker self-schedules itself after collecting a small batch of commands until all expired messages are collected, for example (pseudo-code):
public TaskResult execute(final TaskResultBuilder taskResultBuilder) {
  MutableInteger counter = new MutableInteger();
  long boundary = ActorClock.currentTimeMillis();
  messageState.visitMessagesWithDeadlineBefore(
      boundary,
      expiredMessageKey -> {
         counter.increment()
         if (counter <= 10) {
           writeDeleteMessageCommand(expiredMessageKey, taskResultBuilder);
           return true; // continue with getting the next expired message
         } else {
            // keep the last expiredMessageKey
            // ... and boundary for the next run
           scheduleService.runDelayed(5ms, this);
           return false; // stop iterator
        }
  });
  return taskResultBuilder.build();
}

When the checker gets invoked after self-scheduling, it still references the last received expired message key (and the boundary), that way it could continue iterating from that key again and collect the next 10 messages. When there are no expired messages left, it will schedule the checker by applying the 60s (i.e., the configurable interval).

The second approach would require changes on both sides of the abstraction, in the checker, in zb-db (i.e., the iterator getting a start position), and maybe the scheduling service to make it work.

@deepthidevaki & @korthout, what do you think?

@korthout
Copy link
Member Author

korthout commented Feb 21, 2023

I much prefer the second option:

  • the first option adds a responsibility to the ProcessingScheduleService that has nothing to do with scheduling. It only has something to do with how to deal with the result of the task. This solution smells a bit. The second option is cleaner.
  • the second option allows ZPA to keep control over the scheduling in the engine. This allows us to move to dynamic scheduling at some point in the future. With dynamic scheduling, I mean that the interval is not configured statically but is rescheduled to the deadline of the first message that will expire. The checker is rescheduled when a new message is published with an earlier deadline. We have something similar for Timer Duedates.
  • the second option would enable more powerful column family iterations by allowing a seek of the Iterator to a specific prefix before starting the iteration. This may be helpful to other state iterations in the future, but I currently have no use cases.

If we were to add an option to start the Iterator at a specific start position (e.g. seek to a prefix), then I see two options:

  1. Add a seek(prefix) method to TransactionalColumnFamily that moves the Iterator to a specific prefix. In a consecutive call to forEach, whileTrue, or whileEqualPrefix, the Iterator is reused to continue from this position. This adds flexibility for the caller but requires the Iterator's state to be managed in the TransactionalColumnFamily. Currently, a new Iterator is created for each of those operations.
  2. Overload each iteration operation (forEach, whileTrue, and whileEqualPrefix) to allow passing a prefix. This way, the TransactionalColumnFamily can add an additional seek to search for the prefix before starting its iteration.

Since ZDP owns the zeebe-db module, I cannot make this decision alone. Please let me know what you think is best. @romansmirnov @deepthidevaki @Zelldon

EDIT: Closing this happened by misclick 😅

@korthout korthout reopened this Feb 21, 2023
@ChrisKujawa
Copy link
Member

Hey @korthout I guess it makes sense to add something like an overload of the method, shouldn't be that hard I guess. We already use the seek internally in these methods anyway, due to our column family usage (we have only one column family but all keys are prefixed with the enum ordinal).

As you described an separate seek doesn't really make sense since we would need to keep the iterator or the prefix in memory, and we don't know when to throw it away again.

@megglos this is something we would need to provide. @korthout can you create a separate issue for this

@deepthidevaki
Copy link
Contributor

The second option is cleaner in my opinion also.
It might be tricky to keep the iterator open. Iterator can hog resources in rocksdb. We have to ensure that it is not leaking and closed or reset from time to time. So I think, overloading the methods to pass a starting position would be easier to implement. The cons there is that it has to seek again to the given starting position. But that would be an acceptable overhead, because we already seek whenever the checker is re-scheduled. Combining this to moving to another actor, it would be negligible.

@romansmirnov
Copy link
Member

@korthout, thanks for your feedback! I agree with what you and @deepthidevaki and @Zelldon wrote about preferring the second option. Besides that, I do have not much to add anymore. Thanks for all your input!

ghost pushed a commit that referenced this issue Feb 23, 2023
11785: feat: start db iteration at a specified key r=oleschoenburg a=oleschoenburg

Adds an additional method to ColumnFamily, whileTrue with a startAt key.

relates to #11762

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
ghost pushed a commit that referenced this issue Feb 23, 2023
11785: feat: start db iteration at a specified key r=oleschoenburg a=oleschoenburg

Adds an additional method to ColumnFamily, whileTrue with a startAt key.

relates to #11762

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
ghost pushed a commit that referenced this issue Mar 1, 2023
11782: [Backport stable/8.1] feat: start db iteration at a specified key r=oleschoenburg a=oleschoenburg

Adds an additional method to `ColumnFamily`, `whileTrue` with a `startAt` key.

relates to #11762

Co-authored-by: Christopher Zell <zelldon91@googlemail.com>
Co-authored-by: Ole Schönburg <ole.schoenburg@gmail.com>
@korthout korthout linked a pull request Mar 3, 2023 that will close this issue
14 tasks
ghost pushed a commit that referenced this issue Mar 6, 2023
11856: [stable/8.1] Limit expire message commands in result r=korthout a=korthout

## Description

<!-- Please explain the changes you made here. -->

This adds a virtual limit to the number of EXPIRE Message commands the MessageTimeToLiveChecker adds to a result when executed to prevent that it clogs the log stream with too many of these commands.

I suggest to review this pull request per commit.

## Related issues

<!-- Which issues are closed by this PR or are related -->

closes #11762 



Co-authored-by: Nico Korthout <nico.korthout@camunda.com>
Co-authored-by: Nico Korthout <korthout@users.noreply.github.com>
@ghost ghost closed this as completed in 8bd8399 Mar 7, 2023
@megglos megglos added the version:8.1.9 Marks an issue as being completely or in parts released in 8.1.9 label Mar 13, 2023
@npepinpe npepinpe added the version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0 label Apr 5, 2023
This issue was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/performance Marks an issue as performance related component/engine kind/bug Categorizes an issue or PR as a bug scope/broker Marks an issue or PR to appear in the broker section of the changelog severity/high Marks a bug as having a noticeable impact on the user with no known workaround version:8.1.9 Marks an issue as being completely or in parts released in 8.1.9 version:8.2.0 Marks an issue as being completely or in parts released in 8.2.0
Projects
None yet
6 participants