Skip to content

Commit

Permalink
legacy: Hold persist snapshot until take snapshot completes
Browse files Browse the repository at this point in the history
Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Feb 3, 2024
1 parent f742c08 commit 6d24ea9
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 9 deletions.
60 changes: 55 additions & 5 deletions src/legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,12 @@ struct legacyPersistSnapshot
bool last;
};

static void legacyCancelPersistSnapshot(struct legacyPersistSnapshot *req)

Check warning on line 233 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L233

Added line #L233 was not covered by tests
{
raft_free(req->chunk.base);
raft_configuration_close(&req->metadata.configuration);
}

Check warning on line 237 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L235-L237

Added lines #L235 - L237 were not covered by tests

static void legacyPersistSnapshotCb(struct raft_io_snapshot_put *put,
int status)
{
Expand All @@ -256,13 +262,34 @@ static void legacyPersistSnapshotCb(struct raft_io_snapshot_put *put,
} else {
assert(r->legacy.closing);
assert(status == RAFT_CANCELED);
raft_free(req->chunk.base);
raft_configuration_close(&req->metadata.configuration);
legacyCancelPersistSnapshot(req);
}

raft_free(req);
}

static int legacyPersistSnapshotStart(struct legacyPersistSnapshot *req)
{
struct raft *r = req->r;
int rv;

logRestore(r->legacy.log, req->metadata.index, req->metadata.term);

rv = r->io->snapshot_put(r->io, 0, &req->put, &req->snapshot,
legacyPersistSnapshotCb);
if (rv != 0) {
goto err;

Check warning on line 281 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L281

Added line #L281 was not covered by tests
}

return 0;

err:
raft_free(req);

Check warning on line 287 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L286-L287

Added lines #L286 - L287 were not covered by tests
ErrMsgTransferf(r->io->errmsg, r->errmsg, "put snapshot at %llu",
req->metadata.index);
return rv;

Check warning on line 290 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L290

Added line #L290 was not covered by tests
}

static int legacyHandleUpdateSnapshot(struct raft *r,
struct raft_snapshot_metadata *metadata,
size_t offset,
Expand All @@ -272,6 +299,8 @@ static int legacyHandleUpdateSnapshot(struct raft *r,
struct legacyPersistSnapshot *req;
int rv;

assert(r->legacy.snapshot_pending == NULL);

req = raft_malloc(sizeof *req);
if (req == NULL) {
return RAFT_NOMEM;
Expand All @@ -290,10 +319,14 @@ static int legacyHandleUpdateSnapshot(struct raft *r,
req->snapshot.bufs = &req->chunk;
req->snapshot.n_bufs = 1;

logRestore(r->legacy.log, req->metadata.index, req->metadata.term);
/* If we're taking a snapshot, put this install on hold until it's
* completed. */
if (r->legacy.snapshot_taking) {
r->legacy.snapshot_pending = req;
return 0;

Check warning on line 326 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L325-L326

Added lines #L325 - L326 were not covered by tests
}

rv = r->io->snapshot_put(r->io, 0, &req->put, &req->snapshot,
legacyPersistSnapshotCb);
rv = legacyPersistSnapshotStart(req);
if (rv != 0) {
goto err;
}
Expand Down Expand Up @@ -433,6 +466,13 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *put, int status)
if (r->legacy.closing) {
tracef("cancelling snapshot");
status = RAFT_CANCELED;

/* Also cancel any persist snapshot request. */
if (r->legacy.snapshot_pending != NULL) {
struct legacyPersistSnapshot *persist;
persist = r->legacy.snapshot_pending;

Check warning on line 473 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L472-L473

Added lines #L472 - L473 were not covered by tests
legacyCancelPersistSnapshot(persist);
}
}

if (status != 0) {
Expand All @@ -449,6 +489,15 @@ static void takeSnapshotCb(struct raft_io_snapshot_put *put, int status)
event.snapshot.metadata = metadata;
event.snapshot.trailing = 0;
LegacyForwardToRaftIo(r, &event);

if (r->legacy.snapshot_pending != NULL) {
struct legacyPersistSnapshot *persist;
int rv;
r->legacy.snapshot_pending = NULL;
persist = r->legacy.snapshot_pending;

Check warning on line 497 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L494-L497

Added lines #L494 - L497 were not covered by tests
rv = legacyPersistSnapshotStart(persist);
assert(rv == 0);

Check warning on line 499 in src/legacy.c

View check run for this annotation

Codecov / codecov/patch

src/legacy.c#L499

Added line #L499 was not covered by tests
}
}

static int putSnapshot(struct legacyTakeSnapshot *req)
Expand Down Expand Up @@ -512,6 +561,7 @@ static void legacyTakeSnapshot(struct raft *r)
assert(r->last_applied == r->commit_index);

assert(!r->snapshot.persisting);
assert(r->legacy.snapshot_pending == NULL);

tracef("take snapshot at %lld", r->commit_index);

Expand Down
13 changes: 9 additions & 4 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -933,10 +933,15 @@ int replicationInstallSnapshot(struct raft *r,
*rejected = args->last_index;
*async = false;

/* If we are taking a snapshot ourselves or installing a snapshot, ignore
* the request, the leader will eventually retry. TODO: we should do
* something smarter. */
if (r->legacy.snapshot_taking || r->snapshot.persisting) {
/* If we are installing a snapshot, ignore the request, the leader will
* eventually retry.
*
* Note that if we are taking a snapshot, the consuming code is supposed to
* wait until taking the snapshot completes before starting to persist this
* one.
*
* TODO: we should do something smarter. */
if (r->snapshot.persisting) {
*async = true;
infof("already taking or installing snapshot");
return RAFT_BUSY;
Expand Down

0 comments on commit 6d24ea9

Please sign in to comment.