Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: limit num of barriers between Meta -> Source #9980

Closed
fuyufjh opened this issue May 24, 2023 · 7 comments
Closed

feat: limit num of barriers between Meta -> Source #9980

fuyufjh opened this issue May 24, 2023 · 7 comments

Comments

@fuyufjh
Copy link
Member

fuyufjh commented May 24, 2023

Is your feature request related to a problem? Please describe.

Currently, the channel between Meta server and CN i.e. SourceExecutor is unlimited, while other channels between actors limit the barrier number to 2 (#9427). As a result, when system is overloaded, hundreds of barriers are stuck in that Meta->Source channal and released to downstream in an instant after some time, and this may cause unpredicatable consequence to the cluster performance.

Describe the solution you'd like

Proposed by @hzxa21

We can try to limit that channel size and back-presure Meta to "drop" new barriers if there is already 1 barrier left in that Meta->Source channel.

I am not sure whther this can solve the fluctuation problem, but it should not have negative effect.

Describe alternatives you've considered

No response

Additional context

No response

@github-actions github-actions bot added this to the release-0.20 milestone May 24, 2023
@fuyufjh
Copy link
Member Author

fuyufjh commented May 24, 2023

@st1page has proposed to control checkpoint based on time e.g. 10 seconds instead of barrier frequency e.g. 10 barriers to mitigate fluctuation problem. I think my proposal will achieve similar effect.

For example, assuming now the Meta->CN channel has 1 barrier stuck there, and the Meta has already dropped 4 non-checkpoint barrier and 1 checkpoint barrier. In this case, the next barrier that Meta tries to emit should always be a checkpoint barrier, until one is successfully emitted out, then it can go back to the normal frequency.

@BugenZhao
Copy link
Member

BugenZhao commented May 24, 2023

The challenge I come up with in this approach is how to "drop" the barrier. According to the above, once some source gets back-pressured, all sources in all compute nodes should reject this barrier atomically. Here are some possible ways to achieve this:

  • Use two-phase barrier injection. The first RPC tries to reserve (similar to try_reserve) a slot, and if succeeded, send the second RPC to inject.
  • Back pressure based on the information on the meta side, since we know the maximum depth of the streaming graph and the current in-flight barrier number.

Another approach I find that might be simpler to implement is to let the compute node block the inject_barrier RPC from the meta service, until there're free slots in the bounded barrier receiver in the source. During the blocking, new barriers (no matter manually issued or periodical) are also blocked, and we have to adapt the idea of "checkpoint based on time" for this.

@fuyufjh
Copy link
Member Author

fuyufjh commented May 26, 2023

Suddenly I realized a problem... The Meta->Source channel we discussed here is shared among all streaming tasks. But in our case (barrier stuck for minutes #9723), actually only one job (Q16) is problematic, and other jobs should not be affected by it. Specifically, if we stop inject barriers for all jobs, the other jobs will get longer barrier duration / more events between two barriers, which may become another issue.

Is it reasonable to block all of these jobs? cc. @hzxa21 @BugenZhao

@hzxa21
Copy link
Collaborator

hzxa21 commented May 26, 2023

Suddenly I realized a problem... The Meta->Source channel we discussed here is shared among all streaming tasks. But in our case (barrier stuck for minutes #9723), actually only one job (Q16) is problematic, and other jobs should not be affected by it, so is it reasonable to block all of these jobs? cc. @hzxa21 @BugenZhao

I guess this is unavoidable since we are doing global checkpoint. If we allow independent checkpointing for non-connected streaming graph (proposed long time ago), we can reduce the impact of a slow job. Are all nexmark jobs run on a shared table (materialized source)? If that is the case, the streaming graph of different jobs are still connected.

@fuyufjh
Copy link
Member Author

fuyufjh commented May 26, 2023

Are all nexmark jobs run on a shared table (materialized source)? If that is the case, the streaming graph of different jobs are still connected

In this case, No, they are using Kafka Source.

But anyway, it seems hard to explain "source can have better isolation while table can't" to users.

@BugenZhao
Copy link
Member

it seems hard to explain "source can have better isolation while table can't" to users.

This reminds me of a long-resolved question that why we don't have "physical resources" for sources. 😂

@fuyufjh
Copy link
Member Author

fuyufjh commented Jul 14, 2023

Can't solve the problem

@fuyufjh fuyufjh closed this as not planned Won't fix, can't repro, duplicate, stale Jul 14, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants