Skip to content

Commit

Permalink
legacy: Apply committed entries to the FSM
Browse files Browse the repository at this point in the history
The logic for applying committed entries to the FSM has now been moved to the
legacy compatibility layer.

Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 23, 2023
1 parent 70b4561 commit 9c92ed2
Show file tree
Hide file tree
Showing 7 changed files with 177 additions and 151 deletions.
19 changes: 12 additions & 7 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,13 +564,14 @@ enum {
RAFT_STOP,
RAFT_PERSISTED_ENTRIES, /* A batch of entries have been persisted. */
RAFT_PERSISTED_SNAPSHOT, /* A snapshot has been persisted. */
RAFT_SENT, /* A message has been sent (either successfully or not). */
RAFT_RECEIVE, /* A message has been received. */
RAFT_SNAPSHOT, /* A snapshot has been taken. */
RAFT_TIMEOUT, /* The timeout has expired. */
RAFT_SUBMIT, /* New entries have been submitted. */
RAFT_CATCH_UP, /* Start catching-up a server. */
RAFT_TRANSFER /* Submission of leadership trasfer request */
RAFT_SENT, /* A message has been sent (either successfully or not). */
RAFT_RECEIVE, /* A message has been received. */
RAFT_CONFIGURATION, /* A new committed configuration must be applied. */
RAFT_SNAPSHOT, /* A snapshot has been taken. */
RAFT_TIMEOUT, /* The timeout has expired. */
RAFT_SUBMIT, /* New entries have been submitted. */
RAFT_CATCH_UP, /* Start catching-up a server. */
RAFT_TRANSFER /* Submission of leadership trasfer request */
};

/**
Expand Down Expand Up @@ -619,6 +620,10 @@ struct raft_event
struct raft_message *message;
} receive;
struct
{
raft_index index;
} configuration;
struct
{
struct raft_snapshot_metadata metadata; /* Snapshot metadata */
unsigned trailing; /* Trailing entries kept */
Expand Down
6 changes: 2 additions & 4 deletions src/convert.c
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ int convertToLeader(struct raft *r)
r->last_stored, r->commit_index);
r->commit_index = r->last_stored;
r->update->flags |= RAFT_UPDATE_COMMIT_INDEX;
rv = replicationApply(r);
} else if (n_voters > 1) {
/* Raft Dissertation, paragraph 6.4:
* The Leader Completeness Property guarantees that a leader has all
Expand All @@ -189,12 +188,11 @@ int convertToLeader(struct raft *r)
"%d",
rv);
raft_free(entry.buf.base);
goto out;
return rv;
}
}

out:
return rv;
return 0;
}

void convertToUnavailable(struct raft *r)
Expand Down
1 change: 1 addition & 0 deletions src/fixture.c
Original file line number Diff line number Diff line change
Expand Up @@ -1729,6 +1729,7 @@ static bool hasAppliedIndex(struct raft_fixture *f, void *arg)
n++;
}
}

return n == f->n;
}

Expand Down
152 changes: 151 additions & 1 deletion src/legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,151 @@ static void legacyCheckChangeRequest(struct raft *r,
}
}

/* Get the request matching the given @index and @type, if any.
* The type check is skipped when @type == -1. */
static struct request *legacyGetRequest(struct raft *r,
const raft_index index,
int type)
{
queue *head;
struct request *req;

if (r->state != RAFT_LEADER) {
return NULL;
}
QUEUE_FOREACH (head, &r->legacy.pending) {
req = QUEUE_DATA(head, struct request, queue);
if (req->index == index) {
if (type != -1) {
assert(req->type == type);
}
QUEUE_REMOVE(&req->queue);
return req;
}
}
return NULL;
}

/* Apply a RAFT_COMMAND entry that has been committed. */
static int applyCommand(struct raft *r,
const raft_index index,
const struct raft_buffer *buf)
{
struct raft_apply *req;
void *result;
int rv;
rv = r->fsm->apply(r->fsm, buf, &result);
if (rv != 0) {
return rv;
}

r->last_applied = index;

req = (struct raft_apply *)legacyGetRequest(r, index, RAFT_COMMAND);
if (req != NULL && req->cb != NULL) {
req->status = 0;
req->result = result;
QUEUE_PUSH(&r->legacy.requests, &req->queue);
}
return 0;
}

/* Fire the callback of a barrier request whose entry has been committed. */
static void applyBarrier(struct raft *r, const raft_index index)
{
r->last_applied = index;

struct raft_barrier *req;
req = (struct raft_barrier *)legacyGetRequest(r, index, RAFT_BARRIER);
if (req != NULL && req->cb != NULL) {
req->status = 0;
QUEUE_PUSH(&r->legacy.requests, &req->queue);
}
}

/* Apply a RAFT_CHANGE entry that has been committed. */
static void applyChange(struct raft *r, const raft_index index)
{
struct raft_change *req;

assert(index > 0);

r->last_applied = index;

if (r->state == RAFT_LEADER) {
req = r->legacy.change;
r->legacy.change = NULL;

if (req != NULL && req->cb != NULL) {
/* XXX: set the type here, since it's not done in client.c */
req->type = RAFT_CHANGE;
req->status = 0;
QUEUE_PUSH(&r->legacy.requests, &req->queue);
}
}
}

static int legacyApply(struct raft *r,
struct raft_event **events,
unsigned *n_events)
{
raft_index index;
struct raft_event *event;
int rv = 0;

assert(r->state == RAFT_LEADER || r->state == RAFT_FOLLOWER);
assert(r->last_applied <= r->commit_index);

if (r->last_applied == r->commit_index) {
/* Nothing to do. */
return 0;
}

for (index = r->last_applied + 1; index <= r->commit_index; index++) {
const struct raft_entry *entry = logGet(r->log, index);
if (entry == NULL) {
/* This can happen while installing a snapshot */
tracef("replicationApply - ENTRY NULL");
return 0;
}

assert(entry->type == RAFT_COMMAND || entry->type == RAFT_BARRIER ||
entry->type == RAFT_CHANGE);

switch (entry->type) {
case RAFT_COMMAND:
rv = applyCommand(r, index, &entry->buf);
break;
case RAFT_BARRIER:
applyBarrier(r, index);
rv = 0;
break;
case RAFT_CHANGE:
applyChange(r, index);

*n_events += 1;
*events = raft_realloc(*events, *n_events * sizeof **events);
assert(*events != NULL);
event = &(*events)[*n_events - 1];
event->time = r->io->time(r->io);
event->type = RAFT_CONFIGURATION;
event->configuration.index = index;

rv = 0;
break;
default:
rv = 0; /* For coverity. This case can't be taken. */
break;
}

if (rv != 0) {
break;
}
}

return rv;
}

/* Handle a single event, possibly adding more events. */
static int legacyHandleEvent(struct raft *r,
struct raft_entry *entry,
Expand Down Expand Up @@ -768,7 +913,7 @@ static int legacyHandleEvent(struct raft *r,

/* If the new commit index matches the index of a snapshot we have just
* persisted, then restore the FSM state using its cached data. */
if (commit_index == r->legacy.snapshot_index) {
if (commit_index != 0 && commit_index == r->legacy.snapshot_index) {
/* From Figure 5.3:
*
* 8. Reset state machine using snapshot contents.
Expand All @@ -780,6 +925,11 @@ static int legacyHandleEvent(struct raft *r,
goto err;
}
}

rv = legacyApply(r, events, n_events);
if (rv != 0) {
goto err;
}
}

return 0;
Expand Down
4 changes: 4 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,10 @@ int raft_step(struct raft *r,
rv = stepReceive(r, event->receive.id, event->receive.address,
event->receive.message);
break;
case RAFT_CONFIGURATION:
rv = replicationApplyConfigurationChange(
r, event->configuration.index);
break;
case RAFT_SNAPSHOT:
rv = replicationSnapshot(r, &event->snapshot.metadata,
event->snapshot.trailing);
Expand Down
Loading

0 comments on commit 9c92ed2

Please sign in to comment.