Skip to content

Commit

Permalink
raft.h: Add raft->now field tracking what the current time is (#51)
Browse files Browse the repository at this point in the history
Currently struct raft can get the current time by invoking the `time()`
method of `struct raft_io`.

This field will be used to decouple `struct raft` from `struct raft_io`,
so instead of asking `struct raft_io` what the current time is, `struct
raft_io` or equivalent drivers will have the responsibility of telling
struct raft what the current time is.
  • Loading branch information
freeekanayaka authored Sep 15, 2023
2 parents d3828ee + e426572 commit 9f4cd43
Show file tree
Hide file tree
Showing 11 changed files with 48 additions and 24 deletions.
26 changes: 24 additions & 2 deletions include/raft.h
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,22 @@ struct raft_transfer; /* Forward declaration */

struct raft_log;

/* Unused uint64_t slots that are reserved for v0.x extensions.*/
#define RAFT__RESERVED \
struct \
{ \
uint64_t reserved[32]; \
}

/* Extended struct raft fields added after the v0.x ABI freeze. */
#define RAFT__EXTENSIONS \
struct \
{ \
raft_time now; /* Current time, updated via raft_step() */ \
}

RAFT__ASSERT_COMPATIBILITY(RAFT__RESERVED, RAFT__EXTENSIONS);

/**
* Hold and drive the state of a single raft server in a cluster.
* When replacing reserved fields in the middle of this struct, you MUST use a
Expand Down Expand Up @@ -853,10 +869,16 @@ struct raft
unsigned max_catch_up_rounds;
unsigned max_catch_up_round_duration;

/* Future extensions */
uint64_t reserved[32];
/* Fields added after the v0.x ABI freeze, packed in the unused space. */
union {
RAFT__RESERVED;
RAFT__EXTENSIONS;
};
};

#undef RAFT__RESERVED
#undef RAFT__EXTENSIONS

RAFT_API int raft_init(struct raft *r,
struct raft_io *io,
struct raft_fsm *fsm,
Expand Down
4 changes: 3 additions & 1 deletion src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,8 @@ int raft_assign(struct raft *r,
raft_index last_index;
int rv;

r->now = r->io->time(r->io);

tracef("raft_assign to id:%llu the role:%d", id, role);
if (role != RAFT_STANDBY && role != RAFT_VOTER && role != RAFT_SPARE) {
rv = RAFT_BADROLE;
Expand Down Expand Up @@ -300,7 +302,7 @@ int raft_assign(struct raft *r,
/* Initialize the first catch-up round. */
r->leader_state.round_number = 1;
r->leader_state.round_index = last_index;
r->leader_state.round_start = r->io->time(r->io);
r->leader_state.round_start = r->now;

/* Immediately initiate an AppendEntries request. */
rv = replicationProgress(r, server_index);
Expand Down
2 changes: 1 addition & 1 deletion src/convert.c
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ int convertToLeader(struct raft *r)
convertSetState(r, RAFT_LEADER);

/* Reset timers */
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;

/* Reset apply requests queue */
QUEUE_INIT(&r->leader_state.requests);
Expand Down
6 changes: 3 additions & 3 deletions src/election.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ void electionResetTimer(struct raft *r)
assert(timeout >= r->election_timeout);
assert(timeout <= r->election_timeout * 2);
state->randomized_election_timeout = timeout;
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;
}

bool electionTimerExpired(struct raft *r)
{
struct followerOrCandidateState *state = getFollowerOrCandidateState(r);
raft_time now = r->io->time(r->io);
raft_time now = r->now;
return now - r->election_timer_start >= state->randomized_election_timeout;
}

Expand Down Expand Up @@ -289,7 +289,7 @@ int electionVote(struct raft *r,
r->voted_for = args->candidate_id;

/* Reset the election timer. */
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;
}

tracef("vote granted to %llu", args->candidate_id);
Expand Down
7 changes: 3 additions & 4 deletions src/membership.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ bool membershipUpdateCatchUpRound(struct raft *r)
unsigned server_index;
raft_index match_index;
raft_index last_index;
raft_time now = r->io->time(r->io);
raft_time round_duration;
bool is_up_to_date;
bool is_fast_enough;
Expand All @@ -110,7 +109,7 @@ bool membershipUpdateCatchUpRound(struct raft *r)
}

last_index = logLastIndex(r->log);
round_duration = now - r->leader_state.round_start;
round_duration = r->now - r->leader_state.round_start;

is_up_to_date = match_index == last_index;
is_fast_enough = round_duration < r->election_timeout;
Expand All @@ -133,7 +132,7 @@ bool membershipUpdateCatchUpRound(struct raft *r)
* new round. */
r->leader_state.round_number++;
r->leader_state.round_index = last_index;
r->leader_state.round_start = now;
r->leader_state.round_start = r->now;

return false;
}
Expand Down Expand Up @@ -205,7 +204,7 @@ void membershipLeadershipTransferInit(struct raft *r,
{
req->cb = cb;
req->id = id;
req->start = r->io->time(r->io);
req->start = r->now;
req->send.data = NULL;
r->transfer = req;
}
Expand Down
9 changes: 4 additions & 5 deletions src/progress.c
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,7 @@ bool progressIsUpToDate(struct raft *r, unsigned i)
bool progressShouldReplicate(struct raft *r, unsigned i)
{
struct raft_progress *p = &r->leader_state.progress[i];
raft_time now = r->io->time(r->io);
bool needs_heartbeat = now - p->last_send >= r->heartbeat_timeout;
bool needs_heartbeat = r->now - p->last_send >= r->heartbeat_timeout;
raft_index last_index = logLastIndex(r->log);
bool result = false;

Expand All @@ -121,7 +120,7 @@ bool progressShouldReplicate(struct raft *r, unsigned i)
switch (p->state) {
case PROGRESS__SNAPSHOT:
/* Snapshot timed out, move to PROBE */
if (now - p->snapshot_last_send >= r->install_snapshot_timeout) {
if (r->now - p->snapshot_last_send >= r->install_snapshot_timeout) {
tracef("snapshot timed out for index:%u", i);
result = true;
progressAbortSnapshot(r, i);
Expand Down Expand Up @@ -157,12 +156,12 @@ raft_index progressMatchIndex(struct raft *r, unsigned i)

void progressUpdateLastSend(struct raft *r, unsigned i)
{
r->leader_state.progress[i].last_send = r->io->time(r->io);
r->leader_state.progress[i].last_send = r->now;
}

void progressUpdateSnapshotLastSend(struct raft *r, unsigned i)
{
r->leader_state.progress[i].snapshot_last_send = r->io->time(r->io);
r->leader_state.progress[i].snapshot_last_send = r->now;
}

bool progressResetRecentRecv(struct raft *r, const unsigned i)
Expand Down
2 changes: 2 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,13 @@ int raft_init(struct raft *r,
r->pre_vote = false;
r->max_catch_up_rounds = DEFAULT_MAX_CATCH_UP_ROUNDS;
r->max_catch_up_round_duration = DEFAULT_MAX_CATCH_UP_ROUND_DURATION;
r->now = 0;
rv = r->io->init(r->io, r->id, r->address);
if (rv != 0) {
ErrMsgTransfer(r->io->errmsg, r->errmsg, "io");
goto err_after_address_alloc;
}
r->now = r->io->time(r->io);
return 0;

err_after_address_alloc:
Expand Down
1 change: 1 addition & 0 deletions src/recv.c
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ void recvCb(struct raft_io *io, struct raft_message *message)
{
struct raft *r = io->data;
int rv;
r->now = r->io->time(r->io);
if (r->state == RAFT_UNAVAILABLE) {
switch (message->type) {
case RAFT_IO_APPEND_ENTRIES:
Expand Down
2 changes: 1 addition & 1 deletion src/recv_append_entries.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ int recvAppendEntries(struct raft *r,
}

/* Reset the election timer. */
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;

/* If we are installing a snapshot, ignore these entries. TODO: we should do
* something smarter, e.g. buffering the entries in the I/O backend, which
Expand Down
2 changes: 1 addition & 1 deletion src/recv_install_snapshot.c
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ int recvInstallSnapshot(struct raft *r,
if (rv != 0) {
return rv;
}
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;

rv = replicationInstallSnapshot(r, args, &result->rejected, &async);
if (rv != 0) {
Expand Down
11 changes: 5 additions & 6 deletions src/tick.c
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,6 @@ static bool checkContactQuorum(struct raft *r)
/* Apply time-dependent rules for leaders (Figure 3.1). */
static int tickLeader(struct raft *r)
{
raft_time now = r->io->time(r->io);
assert(r->state == RAFT_LEADER);

/* Check if we still can reach a majority of servers.
Expand All @@ -120,13 +119,13 @@ static int tickLeader(struct raft *r)
* successful round of heartbeats to a majority of its cluster; this
* allows clients to retry their requests with another server.
*/
if (now - r->election_timer_start >= r->election_timeout) {
if (r->now - r->election_timer_start >= r->election_timeout) {
if (!checkContactQuorum(r)) {
tracef("unable to contact majority of cluster -> step down");
convertToFollower(r);
return 0;
}
r->election_timer_start = r->io->time(r->io);
r->election_timer_start = r->now;
}

/* Possibly send heartbeats.
Expand All @@ -153,7 +152,7 @@ static int tickLeader(struct raft *r)
if (r->leader_state.promotee_id != 0) {
raft_id id = r->leader_state.promotee_id;
unsigned server_index;
raft_time round_duration = now - r->leader_state.round_start;
raft_time round_duration = r->now - r->leader_state.round_start;
bool is_too_slow;
bool is_unresponsive;

Expand Down Expand Up @@ -224,6 +223,7 @@ void tickCb(struct raft_io *io)
struct raft *r;
int rv;
r = io->data;
r->now = r->io->time(r->io);
rv = tick(r);
if (rv != 0) {
convertToUnavailable(r);
Expand All @@ -233,8 +233,7 @@ void tickCb(struct raft_io *io)
/* For all states: if there is a leadership transfer request in progress,
* check if it's expired. */
if (r->transfer != NULL) {
raft_time now = r->io->time(r->io);
if (now - r->transfer->start >= r->election_timeout) {
if (r->now - r->transfer->start >= r->election_timeout) {
membershipLeadershipTransferClose(r);
}
}
Expand Down

0 comments on commit 9f4cd43

Please sign in to comment.