diff --git a/src/raft.c b/src/raft.c index 883f3e14..a7f79151 100644 --- a/src/raft.c +++ b/src/raft.c @@ -9,6 +9,7 @@ #include "configuration.h" #include "convert.h" #include "election.h" +#include "entry.h" #include "err.h" #include "flags.h" #include "heap.h" @@ -18,6 +19,7 @@ #include "queue.h" #include "recv.h" #include "replication.h" +#include "restore.h" #include "tick.h" #include "tracing.h" @@ -171,6 +173,89 @@ void raft_seed(struct raft *r, unsigned random) r->random = random; } +/* If we're the only voting server in the configuration, automatically + * self-elect ourselves and convert to leader without waiting for the election + * timeout. */ +static int maybeSelfElect(struct raft *r) +{ + const struct raft_server *server; + int rv; + server = configurationGet(&r->configuration, r->id); + if (server == NULL || server->role != RAFT_VOTER || + configurationVoterCount(&r->configuration) > 1) { + return 0; + } + /* Converting to candidate will notice that we're the only voter and + * automatically convert to leader. */ + rv = convertToCandidate(r, false /* disrupt leader */); + if (rv != 0) { + return rv; + } + assert(r->state == RAFT_LEADER); + return 0; +} + +/* Handle a RAFT_START event. */ +static int stepStart(struct raft *r, + raft_term term, + raft_id voted_for, + struct raft_snapshot_metadata *metadata, + raft_index start_index, + struct raft_entry *entries, + unsigned n_entries) +{ + raft_index snapshot_index = 0; + raft_term snapshot_term = 0; + int rv; + + r->current_term = term; + r->voted_for = voted_for; + + if (metadata != NULL) { + snapshot_index = metadata->index; + snapshot_term = metadata->term; + rv = RestoreSnapshot(r, metadata); + if (rv != 0) { + entryBatchesDestroy(entries, n_entries); + return rv; + } + } else if (n_entries > 0) { + /* If we don't have a snapshot and the on-disk log is not empty, then + * the first entry must be a configuration entry. */ + assert(start_index == 1); + assert(entries[0].type == RAFT_CHANGE); + + /* As a small optimization, bump the commit index to 1 since we require + * the first entry to be the same on all servers. */ + r->commit_index = 1; + r->last_applied = 1; + } + + /* Append the entries to the log, possibly restoring the last + * configuration. */ + tracef("restore %u entries starting at %llu", n_entries, start_index); + + rv = RestoreEntries(r, snapshot_index, snapshot_term, start_index, entries, + n_entries); + if (rv != 0) { + entryBatchesDestroy(entries, n_entries); + return rv; + } + + /* By default we start as followers. */ + convertToFollower(r); + + /* If there's only one voting server, and that is us, it's safe to convert + * to leader right away. If that is not us, we're either joining the cluster + * or we're simply configured as non-voter, and we'll stay follower. */ + rv = maybeSelfElect(r); + if (rv != 0) { + return rv; + } + + return 0; +} + /* Handle the completion of a send message operation. */ static int stepSent(struct raft *r, struct raft_message *message, int status) { @@ -218,6 +303,11 @@ int raft_step(struct raft *r, r->now = event->time; switch (event->type) { + case RAFT_START: + rv = stepStart(r, event->start.term, event->start.voted_for, + event->start.metadata, event->start.start_index, + event->start.entries, event->start.n_entries); + break; case RAFT_PERSISTED_ENTRIES: rv = replicationPersistEntriesDone( r, event->persisted_entries.index, @@ -257,7 +347,7 @@ int raft_step(struct raft *r, rv = ClientTransfer(r, event->transfer.server_id); break; default: - rv = 0; + rv = RAFT_INVALID; break; } diff --git a/src/restore.c b/src/restore.c index 70578749..326b83cb 100644 --- a/src/restore.c +++ b/src/restore.c @@ -143,34 +143,12 @@ int RestoreSnapshot(struct raft *r, struct raft_snapshot_metadata *metadata) return 0; } -/* If we're the only voting server in the configuration, automatically - * self-elect ourselves and convert to leader without waiting for the election - * timeout. */ -static int maybeSelfElect(struct raft *r) -{ - const struct raft_server *server; - int rv; - server = configurationGet(&r->configuration, r->id); - if (server == NULL || server->role != RAFT_VOTER || - configurationVoterCount(&r->configuration) > 1) { - return 0; - } - /* Converting to candidate will notice that we're the only voter and - * automatically convert to leader. */ - rv = convertToCandidate(r, false /* disrupt leader */); - if (rv != 0) { - return rv; - } - assert(r->state == RAFT_LEADER); - return 0; -} - int raft_start(struct raft *r) { struct raft_snapshot *snapshot; struct raft_snapshot_metadata metadata; - raft_index snapshot_index = 0; - raft_term snapshot_term = 0; + raft_term term; + raft_id voted_for; raft_index start_index; struct raft_entry *entries; size_t n_entries; @@ -187,15 +165,15 @@ int raft_start(struct raft *r) assert(r->last_stored == 0); tracef("starting"); - rv = r->io->load(r->io, &r->current_term, &r->voted_for, &snapshot, - &start_index, &entries, &n_entries); + rv = r->io->load(r->io, &term, &voted_for, &snapshot, &start_index, + &entries, &n_entries); if (rv != 0) { ErrMsgTransfer(r->io->errmsg, r->errmsg, "io"); return rv; } assert(start_index >= 1); tracef("current_term:%llu voted_for:%llu start_index:%llu n_entries:%zu", - r->current_term, r->voted_for, start_index, n_entries); + term, voted_for, start_index, n_entries); /* If we have a snapshot, let's restore it. */ if (snapshot != NULL) { @@ -212,61 +190,24 @@ int raft_start(struct raft *r) entryBatchesDestroy(entries, n_entries); return rv; } + } + event.time = r->now; + event.type = RAFT_START; + event.start.term = term; + event.start.voted_for = voted_for; + event.start.metadata = NULL; + if (snapshot != NULL) { metadata.index = snapshot->index; metadata.term = snapshot->term; metadata.configuration = snapshot->configuration; metadata.configuration_index = snapshot->configuration_index; - rv = RestoreSnapshot(r, &metadata); - if (rv != 0) { - entryBatchesDestroy(entries, n_entries); - return rv; - } - - raft_free(snapshot->bufs); - snapshot_index = snapshot->index; - snapshot_term = snapshot->term; - raft_free(snapshot); - - } else if (n_entries > 0) { - /* If we don't have a snapshot and the on-disk log is not empty, then - * the first entry must be a configuration entry. */ - assert(start_index == 1); - assert(entries[0].type == RAFT_CHANGE); - - /* As a small optimization, bump the commit index to 1 since we require - * the first entry to be the same on all servers. */ - r->commit_index = 1; - r->last_applied = 1; - } - - /* Append the entries to the log, possibly restoring the last - * configuration. */ - tracef("restore %zu entries starting at %llu", n_entries, start_index); - rv = RestoreEntries(r, snapshot_index, snapshot_term, start_index, entries, - (unsigned)n_entries); - if (rv != 0) { - entryBatchesDestroy(entries, n_entries); - return rv; + event.start.metadata = &metadata; } + event.start.start_index = start_index; + event.start.entries = entries; + event.start.n_entries = (unsigned)n_entries; - /* By default we start as followers. */ - convertToFollower(r); - - /* If there's only one voting server, and that is us, it's safe to convert - * to leader right away. If that is not us, we're either joining the cluster - * or we're simply configured as non-voter, and we'll stay follower. */ - rv = maybeSelfElect(r); - if (rv != 0) { - return rv; - } - - /* Use a dummy event to trigger handling of possible RAFT_APPLY_COMMAND - * tasks. - * - * TODO: use the start event instead. */ - event.type = 255; - event.time = r->now; LegacyForwardToRaftIo(r, &event); /* Start the I/O backend. The tickCb function is expected to fire every @@ -275,6 +216,15 @@ int raft_start(struct raft *r) rv = r->io->start(r->io, r->heartbeat_timeout, tickCb, recvCb); if (rv != 0) { tracef("io start failed %d", rv); + goto out; + } + +out: + if (snapshot != NULL) { + raft_free(snapshot->bufs); + raft_free(snapshot); + } + if (rv != 0) { return rv; }