Skip to content

Commit

Permalink
v1: Use the struct raft_update entries queue when persisting entries
Browse files Browse the repository at this point in the history
Signed-off-by: Free Ekanayaka <free@ekanayaka.io>
  • Loading branch information
freeekanayaka committed Dec 21, 2023
1 parent 7c24ed3 commit d913113
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
42 changes: 31 additions & 11 deletions src/legacy.c
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,33 @@ struct ioForwardPersistEntries
{
struct raft_io_append append;
struct raft *r;
struct raft_task task;
raft_index index;
struct raft_entry *entries;
unsigned n;
};

static void ioForwardPersistEntriesCb(struct raft_io_append *append, int status)
{
struct ioForwardPersistEntries *req = append->data;
struct raft *r = req->r;
struct raft_task task = req->task;
struct raft_task task;
struct raft_persist_entries *params;

task.type = RAFT_PERSIST_ENTRIES;
params = &task.persist_entries;
params->index = req->index;
params->entries = req->entries;
params->n = req->n;

raft_free(req);
ioTaskDone(r, &task, status);
}

static int ioForwardPersistEntries(struct raft *r, struct raft_task *task)
static int ioForwardPersistEntries(struct raft *r,
raft_index index,
struct raft_entry *entries,
unsigned n)
{
struct raft_persist_entries *params = &task->persist_entries;
struct ioForwardPersistEntries *req;
int rv;

Expand All @@ -100,15 +112,17 @@ static int ioForwardPersistEntries(struct raft *r, struct raft_task *task)
return RAFT_NOMEM;
}
req->r = r;
req->task = *task;
req->index = index;
req->entries = entries;
req->n = n;
req->append.data = req;

rv = r->io->truncate(r->io, params->index);
rv = r->io->truncate(r->io, index);
if (rv != 0) {
goto err;
}

rv = r->io->append(r->io, &req->append, params->entries, params->n,
rv = r->io->append(r->io, &req->append, entries, n,
ioForwardPersistEntriesCb);
if (rv != 0) {
goto err;
Expand All @@ -118,7 +132,7 @@ static int ioForwardPersistEntries(struct raft *r, struct raft_task *task)

err:
raft_free(req);
ErrMsgTransferf(r->io->errmsg, r->errmsg, "append %u entries", params->n);
ErrMsgTransferf(r->io->errmsg, r->errmsg, "append %u entries", n);
return rv;
}

Expand Down Expand Up @@ -555,6 +569,15 @@ int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
}
}

if (update.entries.index != 0) {
rv =
ioForwardPersistEntries(r, update.entries.index,
update.entries.batch, update.entries.n);
if (rv != 0) {
goto err;
}
}

for (j = 0; j < update.messages.n; j++) {
rv = ioSendMessage(r, &update.messages.batch[j]);
if (rv != 0) {
Expand All @@ -572,9 +595,6 @@ int LegacyForwardToRaftIo(struct raft *r, struct raft_event *event)
}

switch (task->type) {
case RAFT_PERSIST_ENTRIES:
rv = ioForwardPersistEntries(r, task);
break;
case RAFT_PERSIST_SNAPSHOT:
rv = ioForwardPersistSnapshot(r, task);
break;
Expand Down
23 changes: 4 additions & 19 deletions src/task.c
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,13 @@ int TaskPersistEntries(struct raft *r,
struct raft_entry entries[],
unsigned n)
{
struct raft_task *task;
struct raft_persist_entries *params;
int rv;

task = taskAppend(r);
if (task == NULL) {
rv = RAFT_NOMEM;
goto err;
}

task->type = RAFT_PERSIST_ENTRIES;
assert(r->entries_index == 0);

params = &task->persist_entries;
params->index = index;
params->entries = entries;
params->n = n;
r->entries_index = index;
r->entries = entries;
r->n_entries = n;

return 0;

err:
assert(rv == RAFT_NOMEM);
return rv;
}

int TaskPersistSnapshot(struct raft *r,
Expand Down

0 comments on commit d913113

Please sign in to comment.