From 9c92ed24777632ac1b70705f5653fadd25fdbb6b Mon Sep 17 00:00:00 2001 From: Free Ekanayaka Date: Sat, 23 Dec 2023 14:39:47 +0000 Subject: [PATCH] legacy: Apply committed entries to the FSM The logic for applying committed entries to the FSM has now been moved to the legacy compatibility layer. Signed-off-by: Free Ekanayaka --- include/raft.h | 19 +++--- src/convert.c | 6 +- src/fixture.c | 1 + src/legacy.c | 152 +++++++++++++++++++++++++++++++++++++++++++++- src/raft.c | 4 ++ src/replication.c | 138 ++--------------------------------------- src/replication.h | 8 +-- 7 files changed, 177 insertions(+), 151 deletions(-) diff --git a/include/raft.h b/include/raft.h index 8ec06d70..d3f79fc6 100644 --- a/include/raft.h +++ b/include/raft.h @@ -564,13 +564,14 @@ enum { RAFT_STOP, RAFT_PERSISTED_ENTRIES, /* A batch of entries have been persisted. */ RAFT_PERSISTED_SNAPSHOT, /* A snapshot has been persisted. */ - RAFT_SENT, /* A message has been sent (either successfully or not). */ - RAFT_RECEIVE, /* A message has been received. */ - RAFT_SNAPSHOT, /* A snapshot has been taken. */ - RAFT_TIMEOUT, /* The timeout has expired. */ - RAFT_SUBMIT, /* New entries have been submitted. */ - RAFT_CATCH_UP, /* Start catching-up a server. */ - RAFT_TRANSFER /* Submission of leadership trasfer request */ + RAFT_SENT, /* A message has been sent (either successfully or not). */ + RAFT_RECEIVE, /* A message has been received. */ + RAFT_CONFIGURATION, /* A new committed configuration must be applied. */ + RAFT_SNAPSHOT, /* A snapshot has been taken. */ + RAFT_TIMEOUT, /* The timeout has expired. */ + RAFT_SUBMIT, /* New entries have been submitted. */ + RAFT_CATCH_UP, /* Start catching-up a server. */ + RAFT_TRANSFER /* Submission of leadership trasfer request */ }; /** @@ -619,6 +620,10 @@ struct raft_event struct raft_message *message; } receive; struct + { + raft_index index; + } configuration; + struct { struct raft_snapshot_metadata metadata; /* Snapshot metadata */ unsigned trailing; /* Trailing entries kept */ diff --git a/src/convert.c b/src/convert.c index 371b1202..1de2509e 100644 --- a/src/convert.c +++ b/src/convert.c @@ -163,7 +163,6 @@ int convertToLeader(struct raft *r) r->last_stored, r->commit_index); r->commit_index = r->last_stored; r->update->flags |= RAFT_UPDATE_COMMIT_INDEX; - rv = replicationApply(r); } else if (n_voters > 1) { /* Raft Dissertation, paragraph 6.4: * The Leader Completeness Property guarantees that a leader has all @@ -189,12 +188,11 @@ int convertToLeader(struct raft *r) "%d", rv); raft_free(entry.buf.base); - goto out; + return rv; } } -out: - return rv; + return 0; } void convertToUnavailable(struct raft *r) diff --git a/src/fixture.c b/src/fixture.c index 8d190f8f..fa9d71fd 100644 --- a/src/fixture.c +++ b/src/fixture.c @@ -1729,6 +1729,7 @@ static bool hasAppliedIndex(struct raft_fixture *f, void *arg) n++; } } + return n == f->n; } diff --git a/src/legacy.c b/src/legacy.c index 4a8ae82c..488906b3 100644 --- a/src/legacy.c +++ b/src/legacy.c @@ -661,6 +661,151 @@ static void legacyCheckChangeRequest(struct raft *r, } } +/* Get the request matching the given @index and @type, if any. + * The type check is skipped when @type == -1. */ +static struct request *legacyGetRequest(struct raft *r, + const raft_index index, + int type) +{ + queue *head; + struct request *req; + + if (r->state != RAFT_LEADER) { + return NULL; + } + QUEUE_FOREACH (head, &r->legacy.pending) { + req = QUEUE_DATA(head, struct request, queue); + if (req->index == index) { + if (type != -1) { + assert(req->type == type); + } + QUEUE_REMOVE(&req->queue); + return req; + } + } + return NULL; +} + +/* Apply a RAFT_COMMAND entry that has been committed. */ +static int applyCommand(struct raft *r, + const raft_index index, + const struct raft_buffer *buf) +{ + struct raft_apply *req; + void *result; + int rv; + rv = r->fsm->apply(r->fsm, buf, &result); + if (rv != 0) { + return rv; + } + + r->last_applied = index; + + req = (struct raft_apply *)legacyGetRequest(r, index, RAFT_COMMAND); + if (req != NULL && req->cb != NULL) { + req->status = 0; + req->result = result; + QUEUE_PUSH(&r->legacy.requests, &req->queue); + } + return 0; +} + +/* Fire the callback of a barrier request whose entry has been committed. */ +static void applyBarrier(struct raft *r, const raft_index index) +{ + r->last_applied = index; + + struct raft_barrier *req; + req = (struct raft_barrier *)legacyGetRequest(r, index, RAFT_BARRIER); + if (req != NULL && req->cb != NULL) { + req->status = 0; + QUEUE_PUSH(&r->legacy.requests, &req->queue); + } +} + +/* Apply a RAFT_CHANGE entry that has been committed. */ +static void applyChange(struct raft *r, const raft_index index) +{ + struct raft_change *req; + + assert(index > 0); + + r->last_applied = index; + + if (r->state == RAFT_LEADER) { + req = r->legacy.change; + r->legacy.change = NULL; + + if (req != NULL && req->cb != NULL) { + /* XXX: set the type here, since it's not done in client.c */ + req->type = RAFT_CHANGE; + req->status = 0; + QUEUE_PUSH(&r->legacy.requests, &req->queue); + } + } +} + +static int legacyApply(struct raft *r, + struct raft_event **events, + unsigned *n_events) +{ + raft_index index; + struct raft_event *event; + int rv = 0; + + assert(r->state == RAFT_LEADER || r->state == RAFT_FOLLOWER); + assert(r->last_applied <= r->commit_index); + + if (r->last_applied == r->commit_index) { + /* Nothing to do. */ + return 0; + } + + for (index = r->last_applied + 1; index <= r->commit_index; index++) { + const struct raft_entry *entry = logGet(r->log, index); + if (entry == NULL) { + /* This can happen while installing a snapshot */ + tracef("replicationApply - ENTRY NULL"); + return 0; + } + + assert(entry->type == RAFT_COMMAND || entry->type == RAFT_BARRIER || + entry->type == RAFT_CHANGE); + + switch (entry->type) { + case RAFT_COMMAND: + rv = applyCommand(r, index, &entry->buf); + break; + case RAFT_BARRIER: + applyBarrier(r, index); + rv = 0; + break; + case RAFT_CHANGE: + applyChange(r, index); + + *n_events += 1; + *events = raft_realloc(*events, *n_events * sizeof **events); + assert(*events != NULL); + event = &(*events)[*n_events - 1]; + event->time = r->io->time(r->io); + event->type = RAFT_CONFIGURATION; + event->configuration.index = index; + + rv = 0; + break; + default: + rv = 0; /* For coverity. This case can't be taken. */ + break; + } + + if (rv != 0) { + break; + } + } + + return rv; +} + /* Handle a single event, possibly adding more events. */ static int legacyHandleEvent(struct raft *r, struct raft_entry *entry, @@ -768,7 +913,7 @@ static int legacyHandleEvent(struct raft *r, /* If the new commit index matches the index of a snapshot we have just * persisted, then restore the FSM state using its cached data. */ - if (commit_index == r->legacy.snapshot_index) { + if (commit_index != 0 && commit_index == r->legacy.snapshot_index) { /* From Figure 5.3: * * 8. Reset state machine using snapshot contents. @@ -780,6 +925,11 @@ static int legacyHandleEvent(struct raft *r, goto err; } } + + rv = legacyApply(r, events, n_events); + if (rv != 0) { + goto err; + } } return 0; diff --git a/src/raft.c b/src/raft.c index 74343f08..71d16baa 100644 --- a/src/raft.c +++ b/src/raft.c @@ -347,6 +347,10 @@ int raft_step(struct raft *r, rv = stepReceive(r, event->receive.id, event->receive.address, event->receive.message); break; + case RAFT_CONFIGURATION: + rv = replicationApplyConfigurationChange( + r, event->configuration.index); + break; case RAFT_SNAPSHOT: rv = replicationSnapshot(r, &event->snapshot.metadata, event->snapshot.trailing); diff --git a/src/replication.c b/src/replication.c index 58bce31c..bedc2563 100644 --- a/src/replication.c +++ b/src/replication.c @@ -391,7 +391,6 @@ static int leaderPersistEntriesDone(struct raft *r, int status) { size_t server_index; - int rv; assert(r->state == RAFT_LEADER); @@ -470,11 +469,6 @@ static int leaderPersistEntriesDone(struct raft *r, /* Check if we can commit some new entries. */ replicationQuorum(r, r->last_stored); - rv = replicationApply(r); - if (rv != 0) { - /* TODO: just log the error? */ - } - out: return 0; } @@ -693,11 +687,6 @@ int replicationUpdate(struct raft *r, /* Check if we can commit some new entries. */ replicationQuorum(r, last_index); - rv = replicationApply(r); - if (rv != 0) { - /* TODO: just log the error? */ - } - return 0; } @@ -802,12 +791,6 @@ static int followerPersistEntriesDone(struct raft *r, } } - /* Apply to the FSM any newly stored entry that is also committed. */ - rv = replicationApply(r); - if (rv != 0) { - goto out; - } - result.rejected = 0; respond: @@ -1005,25 +988,9 @@ int replicationAppend(struct raft *r, if (args->leader_commit > r->commit_index && r->last_stored >= r->commit_index) { r->commit_index = min(args->leader_commit, r->last_stored); + r->update->flags |= RAFT_UPDATE_COMMIT_INDEX; } - /* If this is an empty AppendEntries, there's nothing to write. However we - * still want to check if we can commit some entry. However, don't commit - * anything while a snapshot install is busy, r->last_stored will be 0 in - * that case. - * - * From Figure 3.1: - * - * AppendEntries RPC: Receiver implementation: If leaderCommit > - * commitIndex, set commitIndex = min(leaderCommit, index of last new - * entry). - */ - if (!replicationInstallSnapshotBusy(r)) { - rv = replicationApply(r); - if (rv != 0) { - return rv; - } - } if (n == 0) { return 0; } @@ -1200,48 +1167,8 @@ int replicationInstallSnapshot(struct raft *r, return 0; } -/* Apply a RAFT_COMMAND entry that has been committed. */ -static int applyCommand(struct raft *r, - const raft_index index, - const struct raft_buffer *buf) -{ - struct raft_apply *req; - void *result; - int rv; - rv = r->fsm->apply(r->fsm, buf, &result); - if (rv != 0) { - return rv; - } - - r->last_applied = index; - - req = (struct raft_apply *)getRequest(r, index, RAFT_COMMAND); - if (req != NULL && req->cb != NULL) { - req->status = 0; - req->result = result; - QUEUE_PUSH(&r->legacy.requests, &req->queue); - } - return 0; -} - -/* Fire the callback of a barrier request whose entry has been committed. */ -static void applyBarrier(struct raft *r, const raft_index index) -{ - r->last_applied = index; - - struct raft_barrier *req; - req = (struct raft_barrier *)getRequest(r, index, RAFT_BARRIER); - if (req != NULL && req->cb != NULL) { - req->status = 0; - QUEUE_PUSH(&r->legacy.requests, &req->queue); - } -} - -/* Apply a RAFT_CHANGE entry that has been committed. */ -static void applyChange(struct raft *r, const raft_index index) +int replicationApplyConfigurationChange(struct raft *r, raft_index index) { - struct raft_change *req; - assert(index > 0); /* If this is an uncommitted configuration that we had already applied when @@ -1254,12 +1181,9 @@ static void applyChange(struct raft *r, const raft_index index) } r->configuration_committed_index = index; - r->last_applied = index; if (r->state == RAFT_LEADER) { const struct raft_server *server; - req = r->legacy.change; - r->legacy.change = NULL; /* If we are leader but not part of this new configuration, step * down. @@ -1275,14 +1199,9 @@ static void applyChange(struct raft *r, const raft_index index) (void *)server); convertToFollower(r); } - - if (req != NULL && req->cb != NULL) { - /* XXX: set the type here, since it's not done in client.c */ - req->type = RAFT_CHANGE; - req->status = 0; - QUEUE_PUSH(&r->legacy.requests, &req->queue); - } } + + return 0; } int replicationSnapshot(struct raft *r, @@ -1318,55 +1237,6 @@ int replicationSnapshot(struct raft *r, return 0; } -int replicationApply(struct raft *r) -{ - raft_index index; - int rv = 0; - - assert(r->state == RAFT_LEADER || r->state == RAFT_FOLLOWER); - assert(r->last_applied <= r->commit_index); - - if (r->last_applied == r->commit_index) { - /* Nothing to do. */ - return 0; - } - - for (index = r->last_applied + 1; index <= r->commit_index; index++) { - const struct raft_entry *entry = logGet(r->log, index); - if (entry == NULL) { - /* This can happen while installing a snapshot */ - tracef("replicationApply - ENTRY NULL"); - return 0; - } - - assert(entry->type == RAFT_COMMAND || entry->type == RAFT_BARRIER || - entry->type == RAFT_CHANGE); - - switch (entry->type) { - case RAFT_COMMAND: - rv = applyCommand(r, index, &entry->buf); - break; - case RAFT_BARRIER: - applyBarrier(r, index); - rv = 0; - break; - case RAFT_CHANGE: - applyChange(r, index); - rv = 0; - break; - default: - rv = 0; /* For coverity. This case can't be taken. */ - break; - } - - if (rv != 0) { - break; - } - } - - return rv; -} - void replicationQuorum(struct raft *r, const raft_index index) { size_t votes = 0; diff --git a/src/replication.h b/src/replication.h index a688e16e..78e1e82c 100644 --- a/src/replication.h +++ b/src/replication.h @@ -79,11 +79,6 @@ int replicationInstallSnapshot(struct raft *r, /* Returns `true` if the raft instance is currently installing a snapshot */ bool replicationInstallSnapshotBusy(struct raft *r); -/* Apply any committed entry that was not applied yet. - * - * It must be called by leaders or followers. */ -int replicationApply(struct raft *r); - /* Check if a quorum has been reached for the given log index, and update the * commit index accordingly if so. * @@ -126,4 +121,7 @@ int replicationSnapshot(struct raft *r, struct raft_snapshot_metadata *metadata, unsigned trailing); +/* Apply a RAFT_CHANGE entry that has been committed. */ +int replicationApplyConfigurationChange(struct raft *r, raft_index index); + #endif /* REPLICATION_H_ */