Skip to content

Commit

Permalink
client: Use a RAFT_SUBMIT event to add new entries via raft_step()
Browse files Browse the repository at this point in the history
The raft_apply() function now becomes just a regular consumer of the core
raft_step() API.

Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 21, 2023
1 parent fe1b692 commit 9616ad5
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 30 deletions.
85 changes: 55 additions & 30 deletions src/client.c
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,48 @@

#define tracef(...) Tracef(r->tracer, __VA_ARGS__)

int ClientSubmit(struct raft *r, struct raft_entry *entries, unsigned n)
{
raft_index index;
unsigned i;
int rv;

assert(r != NULL);
assert(entries != NULL);
assert(n > 0);

if (r->state != RAFT_LEADER || r->transfer != NULL) {
rv = RAFT_NOTLEADER;
ErrMsgFromCode(r->errmsg, rv);
tracef("raft_apply not leader");
goto err;
}

/* Index of the first entry being appended. */
index = logLastIndex(r->log) + 1;

for (i = 0; i < n; i++) {
struct raft_entry *entry = &entries[i];
rv = logAppend(r->log, entry->term, entry->type, &entry->buf, NULL);
if (rv != 0) {
return rv;
}
}

rv = replicationTrigger(r, index);
if (rv != 0) {
goto err_after_log_append;
}

return 0;

err_after_log_append:
logDiscard(r->log, index);
err:
assert(rv != 0);
return rv;
}

int raft_apply(struct raft *r,
struct raft_apply *req,
const struct raft_buffer bufs[],
Expand All @@ -21,6 +63,7 @@ int raft_apply(struct raft *r,
{
raft_index index;
struct raft_event event;
struct raft_entry entry;
int rv;

tracef("raft_apply n %d", n);
Expand All @@ -29,49 +72,31 @@ int raft_apply(struct raft *r,
assert(bufs != NULL);
assert(n == 1);

if (r->state != RAFT_LEADER || r->transfer != NULL) {
rv = RAFT_NOTLEADER;
ErrMsgFromCode(r->errmsg, rv);
tracef("raft_apply not leader");
goto err;
}

/* Index of the first entry being appended. */
index = logLastIndex(r->log) + 1;
tracef("%u commands starting at %lld", n, index);
req->type = RAFT_COMMAND;
req->index = index;
req->cb = cb;

/* Append the new entries to the log. */
rv = logAppendCommands(r->log, r->current_term, bufs, n);
if (rv != 0) {
goto err;
}

QUEUE_PUSH(&r->leader_state.requests, &req->queue);

rv = replicationTrigger(r, index);
if (rv != 0) {
goto err_after_log_append;
}
entry.type = RAFT_COMMAND;
entry.term = r->current_term;
entry.buf = bufs[0];
entry.batch = NULL;

event.type = RAFT_SUBMIT;
event.time = r->io->time(r->io);
event.type = RAFT_SUBMIT;
event.submit.entries = &entry;
event.submit.n = 1;

rv = LegacyForwardToRaftIo(r, &event);
if (rv != 0) {
goto err_after_log_append;
return rv;
}

return 0;
QUEUE_PUSH(&r->leader_state.requests, &req->queue);

err_after_log_append:
logDiscard(r->log, index);
QUEUE_REMOVE(&req->queue);
err:
assert(rv != 0);
return rv;
return 0;
}

int clientBarrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
Expand Down Expand Up @@ -134,7 +159,7 @@ int raft_barrier(struct raft *r, struct raft_barrier *req, raft_barrier_cb cb)
return rv;
}

event.type = RAFT_SUBMIT;
event.type = 255;
event.time = r->io->time(r->io);

rv = LegacyForwardToRaftIo(r, &event);
Expand Down Expand Up @@ -188,7 +213,7 @@ static int clientChangeConfiguration(

r->configuration_uncommitted_index = index;

event.type = RAFT_SUBMIT;
event.type = 255;
event.time = r->io->time(r->io);

rv = LegacyForwardToRaftIo(r, &event);
Expand Down
3 changes: 3 additions & 0 deletions src/client.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#include "../include/raft.h"

/* Submit the given entries and start replicating them. */
int ClientSubmit(struct raft *r, struct raft_entry *entries, unsigned n);

/* Apply a barrier entry.
*
* This function holds the common logic shared by raft_barrier() and
Expand Down
4 changes: 4 additions & 0 deletions src/raft.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "assert.h"
#include "byte.h"
#include "client.h"
#include "configuration.h"
#include "convert.h"
#include "election.h"
Expand Down Expand Up @@ -294,6 +295,9 @@ int raft_step(struct raft *r,
case RAFT_TIMEOUT:
rv = Tick(r);
break;
case RAFT_SUBMIT:
rv = ClientSubmit(r, event->submit.entries, event->submit.n);
break;
default:
rv = 0;
break;
Expand Down

0 comments on commit 9616ad5

Please sign in to comment.