Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Remote Segment Store] Failure handling #3906

Closed
Tracked by #5671
sachinpkale opened this issue Jul 14, 2022 · 4 comments
Closed
Tracked by #5671

[Remote Segment Store] Failure handling #3906

sachinpkale opened this issue Jul 14, 2022 · 4 comments
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request

Comments

@sachinpkale
Copy link
Member

sachinpkale commented Jul 14, 2022

This doc is WIP and I will be adding more details around the steps of proposed solution.

Goal

Identify failure scenarios that can occur while uploading or restoring data to/from the remote segment store and propose a solution to handle these failures. In order to provide durability guarantees, we need to ensure persistence and integrity of data in the remote store.

Invariant

Following is the invariant that we track for remote store feature (remote translog + remote segment store)

At any given time, Remote Segment Store and Remote Translog will contain all the indexed data (Data Completeness) and with the data present in these stores, a Lucene index can be successfully built (Data Integrity).

Scope

In this doc, we will discuss failure scenarios that can occur while working with remote segment store but these scenarios may not be just limited to remote segment store. For some of the scenarios, we assume a specific working of remote translog as well as segment replication. Failure scenarios specific to remote translog or segment replication are not discussed.

Existing Flow

Flow_2

This flow works fine if following holds true (let’s call them Happy Flow Assumptions):

  • Remote store is in sync with primary
  • Segment upload/download to/from the remote store happens without corruption.
  • Replicas are in sync with primary
  • Exactly one primary for a given shard is uploading segments to remote store

Failure Handling

Any deviation from happy flow assumptions mentioned above can create completeness or integrity issues of the data stored in the remote segment store. A deviation does not mean it will always create an issue but could lead to an issue under certain circumstances. We have listed down these failure scenarios in the section: Appendix: Failure Scenarios

Failure Buckets

Following are the deviations from the happy flow assumptions. The corresponding failure scenarios are explained in the later section. Even though replicas do not directly come into picture while interacting with remote segment store, in case of fail-over, when replica is promoted, the state of the replica at that time can create issues.

  • Remote store is lagging behind primary by N segments
  • Remote store is ahead of primary by M segments
  • Uploaded segment file is corrupted
  • While restoring the data, downloaded segment file is corrupted
  • Replicas are lagging behind primary by N segments
  • Replicas are ahead of primary by M segments
  • More than one primary at the same time for a given shard
  • No primary for a given shard

Potential Approaches

Based on above failure scenarios, data in remote segment store will either be missing (remote store is lagging and primary goes down) or corrupted (uploaded segment file is corrupted/existing segment file is overwritten by another primary).
In order to keep the invariant, the solution to write and read data from segment store needs to consider following things:

Handling Missing Data

Here, remote segment store needs to work with remote translog to make sure that indexed data is always present in the remote store.

  • Remote segment store will keep checkpoint of last successful uploaded commit, say C1.
  • Remote translog will not purge write operations post C1.
  • For redundancy purpose, it will not be just last commit but last N commits. Both remote stores will soft delete the stale data from last but Nth commit till the latest commit.

Handling Corrupt Data

Data corruption indicates that by using data in the remote segment store, a Lucene index can’t be built. It can happen due to various reasons:

  1. Segment files are missing which are part of the latest checkpoint
    1. Potential Solution: Always upload checkpoint file after corresponding segment files are uploaded.
  2. Segment files are corrupted while uploading
    1. Potential Solution: Checksum validation (during upload as well as periodic)
  3. Two primary nodes (created due to n/w partition) uploaded files at the same time overwriting segment files uploaded by other node, leaving the end state corrupted.
    1. Potential Solution: Use primary term to differentiate between segments uploaded from different primaries. This can be achieved by adding the primary term in the remote directory path or by adding primary term prefix to the segment file name. This assumes that each primary has unique primary term.

Recommended Approach

High Level Idea

  • Make sure a segment file is not overwritten once uploaded, this can be achieved by adding a unique prefix (or suffix or both) to segment file name.
  • Keep a mapping file per commit which will keep info of original segment file name to uploaded name (and vice versa).

Filename Conventions

  • Upload
    • Add <primary_term>_<checkpoint generation>_<UUID> suffix (or prefix) to segment filename before uploading.
  • Download
    • Remove <primary_term>_<checkpoint generation>_<UUID> suffix before downloading the segment to local filesystem.
  • Resolving conflicts
    • Conflicts need to be resolved for mapping file only as it is used to initialize upload/download.
    • Conflicts are uploaded in following order:
      • Primary Term - Higher primary term wins.
      • Generation - Each checkpoint (refresh/commit) has corresponding generation number associated with it. It is in Radix 36. Higher generation wins.
        • For refresh checkpoint, we may need to create refresh specific generation as well, need to check further.
      • UUID is not used to resolve conflict.
        • If resolving conflict comes down to UUID, we have following 2 options:
          • Pick winner based on lexicographic sorting of UUID. This is just random but consistent logic to resolve the conflicts.
          • Do not pick winner. Return both the files. It is caller's responsibility to choose one over another.

Algorithm Steps

On refresh
  • If shard was not primary and is primary now,
    • Fetch mapping file corresponding to last checkpoint and populate in-memory map (filesUploadedToRemoteStore) with the segment file details (original name, uploaded name, checksum etc)
  • Read latest checkpoint from the local directory
  • Get a list of new files to be uploaded, this is basically newFilesToBeUploaded = checkpoint files - filesUploadedToRemoteStore.
    • While taking a diff, original file name and checksum is checked.
  • Upload each file in newFilesToBeUploaded and update filesUploadedToRemoteStore accordingly.
  • If each file is uploaded successfully, upload checkpoint file.
  • If checkpoint file is uploaded successfully, create and upload mapping file. Each mapping file should contain list of all the segment files for that shard.
On restore
  • Read latest mapping file and create map of uploaded segment files to original segment files.
    • If there are multiple mapping files with same primary term and generation, pick one randomly.
    • We can optimize this logic further by picking mapping file with max sequence number for the corresponding checkpoint.
  • Download all the segment files as per corresponding original name.
  • Replay remote translog based on the sequence number in latest checkpoint.
On failover
  • If replica to be promoted is lagging behind remote store, download the diff and make new primary consistent with the remote store in terms of segments.
    • This is not required, if remote translog keeps track of commit checkpoint of each replica and only purges data till oldest commit checkpoint.
Periodic Jobs
  • One job will run every X minutes and will delete stale (merged away) segments files from remote segment store. The job will make sure to keep last N commit checkpoints and corresponding segment files.

<ToDo: Add details on how the recommended approach takes care of all the failure scenarios mentioned in this issue>

Appendix: Failure Scenarios

Following list will not be extensive list of all the failure scenarios but provides details of type of failures that can occur. Each scenario can be broken down further to get more failure scenarios.

  1. Remote Store is lagging behind primary
    1. Remote store is lagging, no replica, primary goes down - Just restoring data from remote segment store will result in data loss.
    2. Remote store is lagging, 1 replica, primary goes down - Assuming replica was in sync with primary, replica gets promoted and before uploading segments to remote store, it goes down. Just restoring data from remote segment store will result in data loss.
  2. Remote Store is ahead of primary
    1. Remote store is in sync with primary, replica is lagging, primary goes down, remote store is ahead of new primary - new primary creates new segments with the same names that are present in the remote store (but different content) and after overwriting some of the segments in the remote store, new primary goes down. Restoring from remote segment store throws CorruptIndexException
    2. Remote store is ahead of primary (stale segments are not deleted yet), primary goes down, no replica
    3. Remote store is ahead of primary (stale segments are not deleted yet), primary goes down, remote store is ahead of replica
  3. Uploaded segment file is corrupted
    1. A checkpoint contains 3 segment files: s1, s2 and s3 along with checkpoint file c1. Out of there four files, s1, s3 and c1 are uploaded successfully. s2 also got uploaded but its checksum is now different than the chesksum stored in its footer.
  4. While restoring the data, downloaded segment file is corrupted
    1. While restoring a checkpoint, 1 segment file was downloaded where file’s checksum does not match with the one stored in its footer. Lucene throws CorruptIndexException.
  5. Replicas are lagging behind primary
    1. Primary goes down, replica becomes new primary
    2. Primary goes down, during the fail-over process, replica goes down
  6. Replicas are ahead of primary
    1. Primary and 2 replicas, 1 replica lagging, another replica in sync, primary goes down, 1st replica becomes new primary, 2nd replica is ahead of primary
  7. More than one primary at the same time
    1. Primary and 1 replica, primary is in the process of uploading segments and failover happens (primary is still up), replica is promoted and starts uploading segments to the remote store, segments with the same name will get overwritten resulting in CorruptIndexException on restore.
    2. Primary and replica are in sync, primary is partitioned away from cluster, replica is promoted, until old primary is kicked out of the cluster, both nodes are uploading segments to the cluster
    3. Primary partitioned, first replica promoted partitioned and second replica is promoted. All 3 are uploading segments before original primary and original first replica are kicked out of the cluster
  8. No primary for a given shard
@sachinpkale sachinpkale added enhancement Enhancement or improvement to existing feature or request untriaged labels Jul 14, 2022
@sachinpkale sachinpkale changed the title [Remote Segment Store] Failure handling scenarios [Remote Segment Store] Failure handling Jul 14, 2022
@Bukhtawar
Copy link
Collaborator

Two primary nodes (created due to n/w partition) uploaded files at the same time overwriting segment files uploaded by other node, leaving the end state corrupted.
Potential Solution: Use primary term to differentiate between segments uploaded from different primaries. This can be achieved by adding the primary term in the remote directory path or by adding primary term prefix to the segment file name. This assumes that each primary has unique primary term.

We need to potentially handle cases during a peer recovery where the peer primary source(relocating) and the new primary target(initialising) can have the same primary term. We would need to guarantee they don't concurrently upload to the same remote store location thereby causing conflicts/corruptions. Uploads to remote store from both nodes should be strictly serialised.
Not sure if this might be a legit case, where the relocating source node gets partitioned off and is trying to upload we need to ensure all in-flight uploads finish before the new primary target can take over with publication of checkpoints and subsequently uploads.

To make sure segrep is handling these potential cases I opened #3923

@sachinpkale
Copy link
Member Author

@Bukhtawar Thanks for pointing out peer recovery use-case. Just by having primary term as disambiguator may not work. Basically, the idea is to make sure a segment file will not be overwritten in the remote segment store. We also need to have some mechanism which will allow to compare files based on primary term and generation in order to resolve conflicts.

This is what I am thinking. Prefix (or suffix) to a segment file name stored in remote segment store:

<PrimaryTerm>_<NodeId>

NodeId will just be used as disambiguator and will not play any role in resolving conflicts.

@Bukhtawar
Copy link
Collaborator

This is what I am thinking. Prefix (or suffix) to a segment file name stored in remote segment store:
_
NodeId will just be used as disambiguator and will not play any role in resolving conflicts.

Thanks Sachin, I'm on the fence for using just the NodeId as disambiguator since it really depends on the implementation details f.e. we could have two processes share it or realistically two threads on the same JVM and still cause a conflict

Having said that we can always have unique segment names(using UUID preferably) on the same primary term to avoid conflicting file name and the segments_N and/or metadata file refer to it

@sachinpkale
Copy link
Member Author

@Bukhtawar Using UUID makes sense and avoids all the corner cases (known and unknown). Will use UUID.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
distributed framework enhancement Enhancement or improvement to existing feature or request
Projects
None yet
Development

No branches or pull requests

4 participants