Skip to content

Commit

Permalink
Optimize walproposer starting streaming point.
Browse files Browse the repository at this point in the history
Safekeepers who are in the same epoch as donor definitely have correct WAL, so
we can send to them since their flushLsn. This required some additionall fuss
due to convention of always starting streaming at the record boundary.
  • Loading branch information
arssher committed Sep 16, 2021
1 parent 4c1cae9 commit a2e929e
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 23 deletions.
94 changes: 71 additions & 23 deletions src/backend/replication/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,11 @@ HandleWalKeeperResponse(void)
truncateLsn = candidateTruncateLsn;
candidateTruncateLsn = InvalidXLogRecPtr;
}
for (int i = 0; i < n_walkeepers; i++)
{
if (msg->perSafekeeper[i])
free(msg->perSafekeeper[i]);
}
memset(msg, 0xDF, sizeof(WalMessage) + msg->size - sizeof(AppendRequestHeader));
free(msg);
}
Expand Down Expand Up @@ -677,6 +682,7 @@ CreateMessage(XLogRecPtr startpos, char* data, int len)
msg->size = sizeof(AppendRequestHeader) + len;
msg->next = NULL;
msg->ackMask = 0;
memset(&msg->perSafekeeper, '\0', sizeof(msg->perSafekeeper));
msg->req.tag = 'a';
msg->req.term = propTerm;
msg->req.epochStartLsn = propEpochStartLsn;
Expand Down Expand Up @@ -718,6 +724,7 @@ CreateMessageCommitLsnOnly(XLogRecPtr lsn)
msg->size = sizeof(AppendRequestHeader);
msg->next = NULL;
msg->ackMask = 0;
memset(&msg->perSafekeeper, '\0', sizeof(msg->perSafekeeper));
msg->req.tag = 'a';
msg->req.term = propTerm;
msg->req.epochStartLsn = propEpochStartLsn;
Expand Down Expand Up @@ -912,26 +919,54 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec

/*
* Start sending entries to everyone from the beginning (truncateLsn),
* except for donor who doesn't need recovery at all. We could do here
* better, taking into account commitLsn of safekeepers to avoid sending
* them excessive data, but this requires some effort (note also that we
* must always start sending from the beginning of the record).
*
* And note that we definitely can't pick up flushLsn of safekeeper and
* decide he already has everything before, as such WAL is generally
* entirely different than the correct (donor) one.
* except for those who lives in donor's epoch and thus for sure has correct
* WAL. We could do here even slightly better, taking into account commitLsn
* of the rest to avoid sending them excessive data.
*/
for (int i = 0; i < n_walkeepers; i++)
{
if (walkeeper[i].state == SS_IDLE && i != donor)
if (walkeeper[i].state != SS_IDLE)
continue;

if (walkeeper[i].voteResponse.epoch != donorEpoch)
{
SendMessageToNode(i, msgQueueHead);
}
}
/* Mark all recovery messages as already received by the donor. */
for (WalMessage *msg = msgQueueHead; msg != NULL; msg = msg->next)
{
msg->ackMask |= 1 << donor;
else
{
for (WalMessage *msg = msgQueueHead; msg != NULL; msg = msg->next)
{
if (msg->req.endLsn <= walkeeper[i].voteResponse.flushLsn)
{
/* message is already received by this walkeeper */
msg->ackMask |= 1 << i;
}
else
{
uint32 len;
uint32 size;

/*
* By convention we always stream since the beginning of the
* record, and flushLsn points to it -- form the message
* starting there.
*/
len = msg->req.endLsn - walkeeper[i].voteResponse.flushLsn;
size = sizeof(AppendRequestHeader) + len;
msg->perSafekeeper[i] = malloc(size);
*msg->perSafekeeper[i] = msg->req;
msg->perSafekeeper[i]->beginLsn =
walkeeper[i].voteResponse.flushLsn;
memcpy(&msg->perSafekeeper[i] + 1,
(char *) (&msg->req + 1) +
walkeeper[i].voteResponse.flushLsn -
msg->req.beginLsn,
len);
SendMessageToNode(i, msg);
break;
}
}
}
}
return true;
}
Expand Down Expand Up @@ -1206,11 +1241,17 @@ AdvancePollState(int i, uint32 events)
if (!AsyncRead(i, &wk->voteResponse, sizeof(wk->voteResponse)))
return;

elog(LOG,
"got VoteResponse from acceptor %s:%s, voteGiven=" UINT64_FORMAT ", epoch=" UINT64_FORMAT ", flushLsn=%X/%X, truncateLsn=%X/%X",
wk->host, wk->port, wk->voteResponse.voteGiven, wk->voteResponse.epoch,
LSN_FORMAT_ARGS(wk->voteResponse.flushLsn),
LSN_FORMAT_ARGS(wk->voteResponse.truncateLsn));

/*
* In case of acceptor rejecting our vote, bail out, but only if
* either it already lives in strictly higher term (concurrent
* compute spotted) or we are not elected yet and thus need the
* vote.
* In case of acceptor rejecting our vote, bail out, but only
* if either it already lives in strictly higher term
* (concurrent compute spotted) or we are not elected yet and
* thus need the vote.
*/
if ((!wk->voteResponse.voteGiven) &&
(wk->voteResponse.term > propTerm || n_votes < quorum))
Expand Down Expand Up @@ -1285,19 +1326,26 @@ AdvancePollState(int i, uint32 events)
case SS_SEND_WAL:
{
WalMessage* msg = wk->currMsg;
AppendRequestHeader *req = &msg->req;

/* if there is a message specially crafted for this safekeeper, send it */
if (msg->perSafekeeper[i])
req = msg->perSafekeeper[i];

elog(LOG,
"sending message with len %ld beginLsn=%X/%X "
"commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
"sending message with len %ld beginLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s",
msg->size - sizeof(AppendRequestHeader),
LSN_FORMAT_ARGS(msg->req.beginLsn),
LSN_FORMAT_ARGS(msg->req.commitLsn),
LSN_FORMAT_ARGS(req->beginLsn),
LSN_FORMAT_ARGS(req->commitLsn),
LSN_FORMAT_ARGS(truncateLsn), wk->host, wk->port);

/* We write with msg->size here because the body of the message
* is stored after the end of the WalMessage struct, in the
* allocation for each msg */
if (!AsyncWrite(i, &msg->req, msg->size, SS_SEND_WAL_FLUSH, SS_RECV_FEEDBACK))
if (!AsyncWrite(i, req,
sizeof(AppendRequestHeader) + req->endLsn -
req->beginLsn,
SS_SEND_WAL_FLUSH, SS_RECV_FEEDBACK))
return;

break;
Expand Down
6 changes: 6 additions & 0 deletions src/include/replication/walproposer.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,12 @@ struct WalMessage
WalMessage* next; /* L1 list of messages */
uint32 size; /* message size */
uint32 ackMask; /* mask of receivers acknowledged receiving of this message */
/*
* By convention safekeeper starts receiving data since record boundary, we
* may need to send first message not from the chunk beginning for that;
* such trimmed message is formed here.
*/
AppendRequestHeader *perSafekeeper[MAX_WALKEEPERS];
AppendRequestHeader req; /* request to walkeeper (message header) */

/* PHANTOM FIELD:
Expand Down

0 comments on commit a2e929e

Please sign in to comment.