Skip to content

Commit

Permalink
v1: Handle RAFT_START event in raft_step()
Browse files Browse the repository at this point in the history
Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 22, 2023
1 parent a728b3e commit e0ccec8
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 76 deletions.
92 changes: 91 additions & 1 deletion src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "configuration.h"
#include "convert.h"
#include "election.h"
#include "entry.h"
#include "err.h"
#include "flags.h"
#include "heap.h"
Expand All @@ -18,6 +19,7 @@
#include "queue.h"
#include "recv.h"
#include "replication.h"
#include "restore.h"
#include "tick.h"
#include "tracing.h"

Expand Down Expand Up @@ -171,6 +173,89 @@ void raft_seed(struct raft *r, unsigned random)
r->random = random;
}

/* If we're the only voting server in the configuration, automatically
* self-elect ourselves and convert to leader without waiting for the election
* timeout. */
static int maybeSelfElect(struct raft *r)
{
const struct raft_server *server;
int rv;
server = configurationGet(&r->configuration, r->id);
if (server == NULL || server->role != RAFT_VOTER ||
configurationVoterCount(&r->configuration) > 1) {
return 0;
}
/* Converting to candidate will notice that we're the only voter and
* automatically convert to leader. */
rv = convertToCandidate(r, false /* disrupt leader */);
if (rv != 0) {
return rv;
}
assert(r->state == RAFT_LEADER);
return 0;
}

/* Handle a RAFT_START event. */
static int stepStart(struct raft *r,
raft_term term,
raft_id voted_for,
struct raft_snapshot_metadata *metadata,
raft_index start_index,
struct raft_entry *entries,
unsigned n_entries)
{
raft_index snapshot_index = 0;
raft_term snapshot_term = 0;
int rv;

r->current_term = term;
r->voted_for = voted_for;

if (metadata != NULL) {
snapshot_index = metadata->index;
snapshot_term = metadata->term;
rv = RestoreSnapshot(r, metadata);
if (rv != 0) {
entryBatchesDestroy(entries, n_entries);
return rv;
}
} else if (n_entries > 0) {
/* If we don't have a snapshot and the on-disk log is not empty, then
* the first entry must be a configuration entry. */
assert(start_index == 1);
assert(entries[0].type == RAFT_CHANGE);

/* As a small optimization, bump the commit index to 1 since we require
* the first entry to be the same on all servers. */
r->commit_index = 1;
r->last_applied = 1;
}

/* Append the entries to the log, possibly restoring the last
* configuration. */
tracef("restore %u entries starting at %llu", n_entries, start_index);

rv = RestoreEntries(r, snapshot_index, snapshot_term, start_index, entries,
n_entries);
if (rv != 0) {
entryBatchesDestroy(entries, n_entries);
return rv;
}

/* By default we start as followers. */
convertToFollower(r);

/* If there's only one voting server, and that is us, it's safe to convert
* to leader right away. If that is not us, we're either joining the cluster
* or we're simply configured as non-voter, and we'll stay follower. */
rv = maybeSelfElect(r);
if (rv != 0) {
return rv;
}

return 0;
}

/* Handle the completion of a send message operation. */
static int stepSent(struct raft *r, struct raft_message *message, int status)
{
Expand Down Expand Up @@ -218,6 +303,11 @@ int raft_step(struct raft *r,
r->now = event->time;

switch (event->type) {
case RAFT_START:
rv = stepStart(r, event->start.term, event->start.voted_for,
event->start.metadata, event->start.start_index,
event->start.entries, event->start.n_entries);
break;
case RAFT_PERSISTED_ENTRIES:
rv = replicationPersistEntriesDone(
r, event->persisted_entries.index,
Expand Down Expand Up @@ -257,7 +347,7 @@ int raft_step(struct raft *r,
rv = ClientTransfer(r, event->transfer.server_id);
break;
default:
rv = 0;
rv = RAFT_INVALID;
break;
}

Expand Down
100 changes: 25 additions & 75 deletions src/restore.c
Original file line number Diff line number Diff line change
Expand Up @@ -143,34 +143,12 @@ int RestoreSnapshot(struct raft *r, struct raft_snapshot_metadata *metadata)
return 0;
}

/* If we're the only voting server in the configuration, automatically
* self-elect ourselves and convert to leader without waiting for the election
* timeout. */
static int maybeSelfElect(struct raft *r)
{
const struct raft_server *server;
int rv;
server = configurationGet(&r->configuration, r->id);
if (server == NULL || server->role != RAFT_VOTER ||
configurationVoterCount(&r->configuration) > 1) {
return 0;
}
/* Converting to candidate will notice that we're the only voter and
* automatically convert to leader. */
rv = convertToCandidate(r, false /* disrupt leader */);
if (rv != 0) {
return rv;
}
assert(r->state == RAFT_LEADER);
return 0;
}

int raft_start(struct raft *r)
{
struct raft_snapshot *snapshot;
struct raft_snapshot_metadata metadata;
raft_index snapshot_index = 0;
raft_term snapshot_term = 0;
raft_term term;
raft_id voted_for;
raft_index start_index;
struct raft_entry *entries;
size_t n_entries;
Expand All @@ -187,15 +165,15 @@ int raft_start(struct raft *r)
assert(r->last_stored == 0);

tracef("starting");
rv = r->io->load(r->io, &r->current_term, &r->voted_for, &snapshot,
&start_index, &entries, &n_entries);
rv = r->io->load(r->io, &term, &voted_for, &snapshot, &start_index,
&entries, &n_entries);
if (rv != 0) {
ErrMsgTransfer(r->io->errmsg, r->errmsg, "io");
return rv;
}
assert(start_index >= 1);
tracef("current_term:%llu voted_for:%llu start_index:%llu n_entries:%zu",
r->current_term, r->voted_for, start_index, n_entries);
term, voted_for, start_index, n_entries);

/* If we have a snapshot, let's restore it. */
if (snapshot != NULL) {
Expand All @@ -212,61 +190,24 @@ int raft_start(struct raft *r)
entryBatchesDestroy(entries, n_entries);
return rv;
}
}

event.time = r->now;
event.type = RAFT_START;
event.start.term = term;
event.start.voted_for = voted_for;
event.start.metadata = NULL;
if (snapshot != NULL) {
metadata.index = snapshot->index;
metadata.term = snapshot->term;
metadata.configuration = snapshot->configuration;
metadata.configuration_index = snapshot->configuration_index;
rv = RestoreSnapshot(r, &metadata);
if (rv != 0) {
entryBatchesDestroy(entries, n_entries);
return rv;
}

raft_free(snapshot->bufs);
snapshot_index = snapshot->index;
snapshot_term = snapshot->term;
raft_free(snapshot);

} else if (n_entries > 0) {
/* If we don't have a snapshot and the on-disk log is not empty, then
* the first entry must be a configuration entry. */
assert(start_index == 1);
assert(entries[0].type == RAFT_CHANGE);

/* As a small optimization, bump the commit index to 1 since we require
* the first entry to be the same on all servers. */
r->commit_index = 1;
r->last_applied = 1;
}

/* Append the entries to the log, possibly restoring the last
* configuration. */
tracef("restore %zu entries starting at %llu", n_entries, start_index);
rv = RestoreEntries(r, snapshot_index, snapshot_term, start_index, entries,
(unsigned)n_entries);
if (rv != 0) {
entryBatchesDestroy(entries, n_entries);
return rv;
event.start.metadata = &metadata;
}
event.start.start_index = start_index;
event.start.entries = entries;
event.start.n_entries = (unsigned)n_entries;

/* By default we start as followers. */
convertToFollower(r);

/* If there's only one voting server, and that is us, it's safe to convert
* to leader right away. If that is not us, we're either joining the cluster
* or we're simply configured as non-voter, and we'll stay follower. */
rv = maybeSelfElect(r);
if (rv != 0) {
return rv;
}

/* Use a dummy event to trigger handling of possible RAFT_APPLY_COMMAND
* tasks.
*
* TODO: use the start event instead. */
event.type = 255;
event.time = r->now;
LegacyForwardToRaftIo(r, &event);

/* Start the I/O backend. The tickCb function is expected to fire every
Expand All @@ -275,6 +216,15 @@ int raft_start(struct raft *r)
rv = r->io->start(r->io, r->heartbeat_timeout, tickCb, recvCb);
if (rv != 0) {
tracef("io start failed %d", rv);
goto out;
}

out:
if (snapshot != NULL) {
raft_free(snapshot->bufs);
raft_free(snapshot);
}
if (rv != 0) {
return rv;
}

Expand Down

0 comments on commit e0ccec8

Please sign in to comment.