Skip to content

Commit

Permalink
legacy: Handle triggering promotion after a server has caught up
Browse files Browse the repository at this point in the history
Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 23, 2023
1 parent 5e99a5f commit 7f17457
Show file tree
Hide file tree
Showing 4 changed files with 125 additions and 104 deletions.
31 changes: 9 additions & 22 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,28 +189,22 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
return rv;
}

static int clientChangeConfiguration(
struct raft *r,
struct raft_change *req,
const struct raft_configuration *configuration)
int ClientChangeConfiguration(struct raft *r,
const struct raft_configuration *configuration)
{
raft_index index;
struct raft_entry entry;
struct raft_event event;
int rv;

(void)req;

/* Index of the entry being appended. */
index = logLastIndex(r->log) + 1;
assert(r->state == RAFT_LEADER);

entry.type = RAFT_CHANGE;
entry.term = r->current_term;

/* Encode the configuration. */
rv = configurationEncode(configuration, &entry.buf);
if (rv != 0) {
goto err;
return rv;
}

event.time = r->io->time(r->io);
Expand All @@ -220,17 +214,10 @@ static int clientChangeConfiguration(

rv = LegacyForwardToRaftIo(r, &event);
if (rv != 0) {
goto err_after_log_append;
return rv;
}

return 0;

err_after_log_append:
logTruncate(r->log, index);

err:
assert(rv != 0);
return rv;
}

int raft_add(struct raft *r,
Expand Down Expand Up @@ -264,7 +251,7 @@ int raft_add(struct raft *r,
req->cb = cb;
req->catch_up_id = 0;

rv = clientChangeConfiguration(r, req, &configuration);
rv = ClientChangeConfiguration(r, &configuration);
if (rv != 0) {
goto err_after_configuration_copy;
}
Expand Down Expand Up @@ -390,9 +377,9 @@ int raft_assign(struct raft *r,
int old_role = r->configuration.servers[server_index].role;
r->configuration.servers[server_index].role = role;

rv = clientChangeConfiguration(r, req, &r->configuration);
rv = ClientChangeConfiguration(r, &r->configuration);
if (rv != 0) {
tracef("clientChangeConfiguration failed %d", rv);
tracef("ClientChangeConfiguration failed %d", rv);
r->configuration.servers[server_index].role = old_role;
return rv;
}
Expand Down Expand Up @@ -455,7 +442,7 @@ int raft_remove(struct raft *r,
req->cb = cb;
req->catch_up_id = 0;

rv = clientChangeConfiguration(r, req, &configuration);
rv = ClientChangeConfiguration(r, &configuration);
if (rv != 0) {
goto err_after_configuration_copy;
}
Expand Down
4 changes: 4 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ void ClientCatchUp(struct raft *r, raft_id server_id);
/* Start transferring leadership to the given server. */
int ClientTransfer(struct raft *r, raft_id server_id);

/* Submit a new RAFT_CHANGE entry with the given new configuration. */
int ClientChangeConfiguration(struct raft *r,
const struct raft_configuration *configuration);

#endif /* CLIENT_H_ */
135 changes: 111 additions & 24 deletions src/legacy.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include "legacy.h"
#include "assert.h"
#include "client.h"
#include "configuration.h"
#include "err.h"
#include "log.h"
Expand Down Expand Up @@ -570,7 +571,10 @@ void LegacyFireCompletedRequests(struct raft *r)

/* Check whether a raft_change request has been completed, and put it in the
* completed requests queue if so. */
static void legacyCheckChangeRequest(struct raft *r)
static void legacyCheckChangeRequest(struct raft *r,
struct raft_entry *entry,
struct raft_event **events,
unsigned *n_events)
{
struct raft_change *change;
int status;
Expand All @@ -580,41 +584,86 @@ static void legacyCheckChangeRequest(struct raft *r)
return;
}

if (r->legacy.change->catch_up_id != 0) {
/* A raft_catch_up() call can fail only if the server is not the
* leader or if the given ID is invalid. If the server was not the
* leader then r->legacy.change would be NULL, and we know that the
* ID is valid, otherwise the request couldn't have been submitted.
*/
rv = raft_catch_up(r, r->legacy.change->catch_up_id, &status);
assert(rv == 0);
if (r->legacy.change->catch_up_id == 0) {
return;
}

if (status == RAFT_CATCH_UP_ABORTED) {
change = r->legacy.change;
r->legacy.change = NULL;
if (change != NULL && change->cb != NULL) {
change->type = RAFT_CHANGE;
change->status = RAFT_NOCONNECTION;
QUEUE_PUSH(&r->legacy.requests, &change->queue);
}
change = r->legacy.change;

/* A raft_catch_up() call can fail only if the server is not the
* leader or if the given ID is invalid. If the server was not the
* leader then r->legacy.change would be NULL, and we know that the
* ID is valid, otherwise the request couldn't have been submitted.
*/
rv = raft_catch_up(r, r->legacy.change->catch_up_id, &status);
assert(rv == 0);

if (status == RAFT_CATCH_UP_ABORTED) {
r->legacy.change = NULL;
if (change->cb != NULL) {
change->type = RAFT_CHANGE;
change->status = RAFT_NOCONNECTION;
QUEUE_PUSH(&r->legacy.requests, &change->queue);
}
}

if (status == RAFT_CATCH_UP_FINISHED) {
struct raft_configuration configuration;
struct raft_server *server;
struct raft_event *event;
unsigned i;

i = configurationIndexOf(&r->configuration, change->catch_up_id);
assert(i < r->configuration.n);

server = &r->configuration.servers[i];
assert(server->role != RAFT_VOTER);

change->catch_up_id = 0;

/* Update our current configuration. */
rv = configurationCopy(&r->configuration, &configuration);
assert(rv == 0);

configuration.servers[i].role = RAFT_VOTER;

entry->type = RAFT_CHANGE;
entry->term = r->current_term;

/* Encode the configuration. */
rv = configurationEncode(&configuration, &entry->buf);
assert(rv == 0);

*n_events += 1;
*events = raft_realloc(*events, *n_events * sizeof **events);
assert(*events != NULL);

event = &(*events)[*n_events - 1];
event->time = r->io->time(r->io);
event->type = RAFT_SUBMIT;
event->submit.entries = entry;
event->submit.n = 1;

configurationClose(&configuration);
}
}

int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
/* Handle a single event, possibly adding more events. */
static int legacyHandleEvent(struct raft *r,
struct raft_entry *entry,
struct raft_event **events,
unsigned *n_events,
unsigned i)
{
struct raft_event *event;
struct raft_update update;
unsigned j;
queue *head;
struct request *req;
bool has_pending_no_space_failure = false;

int rv;

if (r->io == NULL) {
/* No legacy raft_io implementation, just do nothing. */
return 0;
}
event = &(*events)[i];

rv = raft_step(r, event, &update);
if (rv != 0) {
Expand All @@ -634,7 +683,7 @@ int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
}

/* Check whether a raft_change request has been completed. */
legacyCheckChangeRequest(r);
legacyCheckChangeRequest(r, entry, events, n_events);

/* Check if there's a client request in the completion queue which has
* failed due to a RAFT_NOSPACE error. In that case we will not call the
Expand Down Expand Up @@ -703,7 +752,45 @@ int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)

return 0;

err:
return rv;
}

int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
{
struct raft_event *events;
unsigned n_events;
unsigned i;
struct raft_entry entry; /* Used for actual promotion of RAFT_CHANGE reqs */
int rv;

if (r->io == NULL) {
/* No legacy raft_io implementation, just do nothing. */
return 0;
}

/* Initially the set of events contains only the event passed as argument,
* but might grow if some further events get generated by the handling
* code. */
events = raft_malloc(sizeof *events);
if (events == NULL) {
return RAFT_NOMEM;
}
events[0] = *event;
n_events = 1;

for (i = 0; i < n_events; i++) {
rv = legacyHandleEvent(r, &entry, &events, &n_events, i);
if (rv != 0) {
goto err;
}
}

raft_free(events);
return 0;

err:
assert(rv != 0);
raft_free(events);
return rv;
}
59 changes: 1 addition & 58 deletions src/replication.c
Original file line number Diff line number Diff line change
Expand Up @@ -596,60 +596,6 @@ int replicationTrigger(struct raft *r, raft_index index)
* This function changes the local configuration marking the server being
* promoted as actually voting, appends the a RAFT_CHANGE entry with the new
* configuration to the local log and triggers its replication. */
static int triggerActualPromotion(struct raft *r)
{
raft_index index;
raft_term term = r->current_term;
size_t server_index;
struct raft_server *server;
int old_role;
int rv;

assert(r->state == RAFT_LEADER);
assert(r->leader_state.promotee_id != 0);

server_index =
configurationIndexOf(&r->configuration, r->leader_state.promotee_id);
assert(server_index < r->configuration.n);

server = &r->configuration.servers[server_index];

assert(server->role != RAFT_VOTER);

/* Update our current configuration. */
old_role = server->role;
server->role = RAFT_VOTER;

/* Index of the entry being appended. */
index = logLastIndex(r->log) + 1;

/* Encode the new configuration and append it to the log. */
rv = logAppendConfiguration(r->log, term, &r->configuration);
if (rv != 0) {
goto err;
}

/* Start writing the new log entry to disk and send it to the followers. */
rv = replicationTrigger(r, index);
if (rv != 0) {
goto err_after_log_append;
}

r->leader_state.promotee_id = 0;
r->configuration_uncommitted_index = logLastIndex(r->log);

return 0;

err_after_log_append:
logTruncate(r->log, index);

err:
server->role = old_role;

assert(rv != 0);
return rv;
}

int replicationUpdate(struct raft *r,
const struct raft_server *server,
const struct raft_append_entries_result *result)
Expand Down Expand Up @@ -729,10 +675,7 @@ int replicationUpdate(struct raft *r,
if (is_being_promoted) {
bool is_up_to_date = membershipUpdateCatchUpRound(r);
if (is_up_to_date) {
rv = triggerActualPromotion(r);
if (rv != 0) {
return rv;
}
r->leader_state.promotee_id = 0;
}
}

Expand Down

0 comments on commit 7f17457

Please sign in to comment.