Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RFC: Suspend MV on Non-Recoverable Errors #54

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions rfcs/0054-suspend-mv-on-non-recoverable-errors.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
---
feature: Suspend MV on Non-Recoverable Errors
authors:
- "Patrick Huang"
start_date: "2023/2/24"
---
# RFC: Suspend MV on Non-Recoverable Errors

# Motivation

Currently we have two error handling mechanisms on MV:

1. Ignore and tolerate the compute error. Relevant issue: https://github.com/risingwavelabs/risingwave/issues/4625)
2. Report error to meta and trigger a full recovery.

However, when there are non-recoverable errors caused by malform records ingested by the user, the current error handling mechanisms are insufficient:

1. Even if these messages can be skip or fixed via filling in default values, correctness can be affected, especially when sinking to external systems, since the damage may hardly get reverted.
Copy link

@jon-chuang jon-chuang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the damage may hardly get reverted.

I'm not completely sure this is correct.

Could you provide some examples where the damage is permanent? One way is that their downstream components to RW are affected, and a lot of manual work is required to clean up the wrong data.

For RW itself, I think we do not need to eagerly suspend the MV, user can create a new MV on the source if they discover errors.

Ultimately, I think the behaviour should be determined by the user, whether they are willing to tolerate certain types of errors with NULL records in place, or if they want to suspend the job. The same goes for batch queries.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way iss that their downstream components to RW are affected, and a lot of manual work is required to clean up the wrong data.

Exactly.

Ultimately, I think the behaviour should be determined by the user, whether they are willing to tolerate certain types of errors or if they want to suspend the job. The same goes for batch queries.

Totally agree. That was my intention too.

2. Recovery won’t help here. We may end up with re-consuming the malform records and trigger full recovery over and over again. All MVs in the cluster will be affected and no progress can be made.

This RFC proposes that we should suspend the affected MV when encountering non-recovery errors. Non-recoverable errors by definition cannot be fixed automatically so manual intervention and re-creating the MV are unavoidable. The goals of this RFC are:
Copy link

@jon-chuang jon-chuang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this may be too strict.

The user may still want the MV to run.

I believe that the user should be in charge of ensuring that their input source is persisted outside of RW (e.g. in msg queue like kafka). They should create a new MV that avoids the errors if they discover that they cannot tolerate the given errors.

Copy link

@jon-chuang jon-chuang Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we should expose the error tolerance to the user. However, we need to decide which mode is the default:

connector_error_suspends_mv: true,
connector_error_kills_batch_query: true,
compute_error_suspends_mv: false,
compute_error_kills_batch_query: true,
storage_error_suspends_mv: true,

There is some overlap with the batch query behaviour (whether to warn or throw error) @fuyufjh . We have not decided which is the best default behaviour for user (or if we should expose the error tolerance option for batch). Actually, I'm leaning towards not tolerating errors for batch.


- Reduce the radius of impact when non-recoverable errors happen. In other words, non-relevant MVs should not be affected.
- Improve observability when non-recoverable errors happen. In other words, MV progress and necessary states should be kept to help user/SRE identify the issue.

# Design

## Non-Recoverable Errors

- Errors caused by malform records ingested by the user source. Examples: record too large, record parsing error, record format/type mismatch.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

record too large

I know this is just an example, but for completeness, in what scenario can record too large occur?

I know we have encountered request too large for etcd request: risingwavelabs/risingwave#7728 but not sure of record too large issue of connectors. @tabVersion

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We recently encounter an issue when user ingests a malform row with >64KB pk. We didn't notice it until compactor stops working and we manually inspect the encoded SSTs in S3. User also didn't realize that they have such a malform row ingested before we explicitly told them there is something wrong with a specific table.

Ideally, we should warn the user or even stop ingesting the malform row once it appeared. Although we have a hotfix merged, we still need to figure out a way to do that instead of just workaround the issue by allowing >64KB keys to be stored.

Also, we doesn't seem to have enforce any length limitation on our PK or the VARCHAR type but this is a separate discussion.

- Errors caused by failure in sanity check. Examples: state table insert/update/delete sanity check fails.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider panic as a kind of Non-Recoverable Errors?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we consider panic as a kind of Non-Recoverable Errors?

+1

Copy link

@neverchanje neverchanje Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Disagree. Suspending on panics would mean the system is basically non-HA. We should at least try for a few times.

Copy link
Member

@fuyufjh fuyufjh Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After some extra thought I think this is bad, because sometimes a panic only means the program don't know how to continue but doesn't necessarily means it can't be recovered from last checkpoint.


## Suspend MV on Non-Recoverable Errors

Error MV and all its downstream MVs are suspended and removed from the streaming graph for checkpointing. Suspended MV and its internal states are still queryrable (at least for SRE). You can treat the suspended MV as a dropped but not GCed MV.

Detail steps on MV suspension:

1. Actor reports non-recoverable errors to meta node.
Copy link
Member

@BugenZhao BugenZhao Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be tricky as we need to avoid panicking (then aborting) for each type of non-recoverable error to make it work, or we have no chance to do the following stuff gracefully. One may argue that catch_unwind can be a fallback, but I worry that it'll corrupt some internal states and lead to chained errors.

For example, by using bytes::Buf we will panic on the deserialization of a corrupted encoding.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the catch_unwind is just to get the information about the error's place such as the actor_id and send them to meta. A check-point-based failover is still needed?

Copy link
Member

@BugenZhao BugenZhao Mar 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes if we have sufficient time to at least report the information. In this case, we must rely on approach #1 of suspending the MV. 🤔

2. Meta node identifies the MV of the failed actor as well as all its downstream MVs.
3. Meta node suspends these MVs by removing them from the streaming graph.
4. A suspended MV will be deleted manually or automatically TTLed.



### DISUCSSION: How to suspend the MV?

1. Trigger a recovery: all ongoing concurrent checkpoint fails and actors of the suspended MV won’t get rebuilt during recovery.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, we expect the users to drop the problematic materialized views to recovery from this. However, does this mean that the materialized view is "dangled" at this time?

2. No recovery triggered: remove the suspended MV from the streaming graph immediately or in the next checkpoint. (Is this possible? e.g. failure in downstream MVs may cause backpressure in upstream and can we preempt backpressure?)

Personally prefer a > b.

### DISCUSSION: What happen when user queries the suspended MV?

1. Ban user query and throw an error.
2. Allow users to query but remind them there is an error and the MV will not longer be updated.

Personally prefer a > b.
Copy link
Member

@fuyufjh fuyufjh Feb 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 looks a little bit better to me, but 1 seems to be the only feasible approache for us because we don't support a dangled MV i.e. an MV without streaming job attached.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 is doable since the data remains on storage and it is immutable. May need some refactoring on the meta side though (maybe we can just treat the dangled MV and its internal state tables as regular batch tables)


## Future possibilities

- We can store the error message, stack trace as well as other detailed information of the suspended MV in a user queryable system table.
-