Skip to content

Commit

Permalink
legacy: Invoke raft_step() just once
Browse files Browse the repository at this point in the history
There's no need anymore to invoke raft_step() more than once, by artificially
creating RAFT_DONE events for synchronous tasks.

Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 22, 2023
1 parent de5e483 commit dcd47dd
Showing 1 changed file with 78 additions and 96 deletions.
174 changes: 78 additions & 96 deletions src/legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -493,131 +493,113 @@ void LegacyFireCompletedRequests(struct raft *r)

int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
{
struct raft_event *events;
unsigned n_events;
unsigned i;
struct raft_update update;
raft_index commit_index;
raft_time timeout;
struct raft_task *tasks;
unsigned n_tasks;
unsigned j;
queue *head;
struct request *req;
bool has_pending_no_space_failure = false;

int rv;

if (r->io == NULL) {
/* No legacy raft_io implementation, just do nothing. */
return 0;
}

/* Initially the set of events contains only the event passed as argument,
* but might grow if some of the tasks get completed synchronously. */
events = raft_malloc(sizeof *events);
if (events == NULL) {
return RAFT_NOMEM;
rv =
raft_step(r, event, &update, &commit_index, &timeout, &tasks, &n_tasks);
if (rv != 0) {
goto err;
}
events[0] = *event;
n_events = 1;

for (i = 0; i < n_events; i++) {
struct raft_update update;
raft_index commit_index;
raft_time timeout;
struct raft_task *tasks;
unsigned n_tasks;
unsigned j;
queue *head;
struct request *req;
bool has_pending_no_space_failure = false;
/* Check if there's a client request in the completion queue which has
* failed due to a RAFT_NOSPACE error. In that case we will not call the
* step_cb just yet, because otherwise cowsql/dqlite would notice that
* the leader has stepped down and immediately close all connections,
* without a chance of properly returning the error to the client. */
QUEUE_FOREACH (head, &r->legacy.requests) {
req = QUEUE_DATA(head, struct request, queue);
if (req->type == RAFT_COMMAND) {
if (((struct raft_apply *)req)->status == RAFT_NOSPACE) {
has_pending_no_space_failure = true;
break;
}
}
}

event = &events[i];
if (!has_pending_no_space_failure && r->legacy.step_cb != NULL) {
r->legacy.step_cb(r);
}

rv = raft_step(r, event, &update, &commit_index, &timeout, &tasks,
&n_tasks);
if (legacyShouldTakeSnapshot(r)) {
legacyTakeSnapshot(r);
}

/* If the current term was updated, persist it. */
if (update.flags & RAFT_UPDATE_CURRENT_TERM) {
rv = r->io->set_term(r->io, raft_current_term(r));
if (rv != 0) {
goto err;
}
}

/* Check if there's a client request in the completion queue which has
* failed due to a RAFT_NOSPACE error. In that case we will not call the
* step_cb just yet, because otherwise cowsql/dqlite would notice that
* the leader has stepped down and immediately close all connections,
* without a chance of properly returning the error to the client. */
QUEUE_FOREACH (head, &r->legacy.requests) {
req = QUEUE_DATA(head, struct request, queue);
if (req->type == RAFT_COMMAND) {
if (((struct raft_apply *)req)->status == RAFT_NOSPACE) {
has_pending_no_space_failure = true;
break;
}
}
}

if (!has_pending_no_space_failure && r->legacy.step_cb != NULL) {
r->legacy.step_cb(r);
}

if (legacyShouldTakeSnapshot(r)) {
legacyTakeSnapshot(r);
/* If the current vote was updated, persist it. */
if (update.flags & RAFT_UPDATE_VOTED_FOR) {
rv = r->io->set_vote(r->io, raft_voted_for(r));
if (rv != 0) {
goto err;
}
}

/* If the current term was updated, persist it. */
if (update.flags & RAFT_UPDATE_CURRENT_TERM) {
rv = r->io->set_term(r->io, raft_current_term(r));
if (rv != 0) {
goto err;
}
if (update.entries.index != 0) {
rv = ioForwardPersistEntries(r, update.entries.index,
update.entries.batch, update.entries.n);
if (rv != 0) {
goto err;
}
}

/* If the current vote was updated, persist it. */
if (update.flags & RAFT_UPDATE_VOTED_FOR) {
rv = r->io->set_vote(r->io, raft_voted_for(r));
if (rv != 0) {
goto err;
}
for (j = 0; j < update.messages.n; j++) {
rv = ioSendMessage(r, &update.messages.batch[j]);
if (rv != 0) {
goto err;
}
}

if (update.entries.index != 0) {
rv =
ioForwardPersistEntries(r, update.entries.index,
update.entries.batch, update.entries.n);
if (rv != 0) {
goto err;
}
}
for (j = 0; j < n_tasks; j++) {
struct raft_task *task = &tasks[j];

for (j = 0; j < update.messages.n; j++) {
rv = ioSendMessage(r, &update.messages.batch[j]);
if (rv != 0) {
goto err;
}
/* Don't execute any further task if we're shutting down. */
if (r->close_cb != NULL) {
ioTaskDone(r, task, RAFT_CANCELED);
continue;
}

for (j = 0; j < n_tasks; j++) {
struct raft_task *task = &tasks[j];

/* Don't execute any further task if we're shutting down. */
if (r->close_cb != NULL) {
ioTaskDone(r, task, RAFT_CANCELED);
continue;
}
switch (task->type) {
case RAFT_PERSIST_SNAPSHOT:
rv = ioForwardPersistSnapshot(r, task);
break;
case RAFT_LOAD_SNAPSHOT:
rv = ioForwardLoadSnapshot(r, task);
break;
default:
rv = RAFT_INVALID;
assert(0);
break;
};

switch (task->type) {
case RAFT_PERSIST_SNAPSHOT:
rv = ioForwardPersistSnapshot(r, task);
break;
case RAFT_LOAD_SNAPSHOT:
rv = ioForwardLoadSnapshot(r, task);
break;
default:
rv = RAFT_INVALID;
assert(0);
break;
};

if (rv != 0) {
goto err;
}
if (rv != 0) {
goto err;
}
}

raft_free(events);
return 0;
err:
raft_free(events);

err:
assert(rv != 0);
return rv;
}

0 comments on commit dcd47dd

Please sign in to comment.