Skip to content

Commit

Permalink
Fix truncateLsn update (#101)
Browse files Browse the repository at this point in the history
truncateLsn is now advanced to `Min(walkeeper[i].feedback.flushLsn)` with taking epochs into account.
  • Loading branch information
petuhovskiy authored and MMeent committed Jul 7, 2022
1 parent 827f3f0 commit 4bd4f68
Showing 1 changed file with 39 additions and 46 deletions.
85 changes: 39 additions & 46 deletions src/backend/replication/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,6 @@ static AppendResponse lastFeedback;
* record-aligned (first record which might not yet received by someone).
*/
static XLogRecPtr truncateLsn;
static XLogRecPtr candidateTruncateLsn;
static VoteRequest voteRequest; /* Vote request for walkeeper */
static term_t propTerm; /* term of the proposer */
static XLogRecPtr propEpochStartLsn; /* epoch start lsn of the proposer */
Expand Down Expand Up @@ -150,6 +149,26 @@ CalculateDiskConsistentLsn(void)
return lsn;
}

/*
* Get minimum of flushed LSNs of all safekeepers, which is the LSN of the
* last WAL record that can be safely discarded.
*/
static XLogRecPtr
CalculateMinFlushLsn(void)
{
XLogRecPtr lsn = UnknownXLogRecPtr;
for (int i = 0; i < n_walkeepers; i++)
{
/* We can't rely on safekeeper flushLsn if it has wrong epoch */
if (walkeeper[i].feedback.epoch != propTerm)
return 0;

if (walkeeper[i].feedback.flushLsn < lsn)
lsn = walkeeper[i].feedback.flushLsn;
}
return lsn;
}

/* Initializes the internal event set, provided that it is currently null */
static void
InitEventSet(void)
Expand Down Expand Up @@ -357,7 +376,7 @@ HandleWalKeeperResponse(void)
HotStandbyFeedback hsFeedback;
XLogRecPtr minQuorumLsn;
XLogRecPtr diskConsistentLsn;
WalMessage *msgQueueAck;
XLogRecPtr minFlushLsn;

minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
diskConsistentLsn = CalculateDiskConsistentLsn();
Expand Down Expand Up @@ -389,39 +408,24 @@ HandleWalKeeperResponse(void)
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
}

/* Advance truncateLsn */
msgQueueAck = msgQueueHead;
while (msgQueueAck != NULL && msgQueueAck->ackMask == ((1 << n_walkeepers) - 1))
{
/*
* This piece is received by everyone; try to advance truncateLsn, but
* hold it back to nearest commitLsn. Thus we will always start
* streaming from the beginning of the record, which simplifies
* decoding on the far end.
*
* This also prevents surprising violation of truncateLsn <= commitLsn
* invariant which might occur because 1) truncateLsn can be advanced
* immediately once chunk is broadcast to all safekeepers, and
* commitLsn generally can't be advanced based on feedback from
* safekeeper who is still in the previous epoch (similar to 'leader
* can't commit entries from previous term' in Raft); 2) chunks we
* read from WAL and send are plain sheets of bytes, but safekeepers
* ack only on commit boundaries.
*/
if (msgQueueAck->req.endLsn >= minQuorumLsn && minQuorumLsn != InvalidXLogRecPtr)
{
truncateLsn = minQuorumLsn;
candidateTruncateLsn = InvalidXLogRecPtr;
}
else if (msgQueueAck->req.endLsn >= candidateTruncateLsn &&
candidateTruncateLsn != InvalidXLogRecPtr)
{
truncateLsn = candidateTruncateLsn;
candidateTruncateLsn = InvalidXLogRecPtr;
}

msgQueueAck = msgQueueAck->next;
}
/*
* Try to advance truncateLsn to minFlushLsn, which is the last record
* flushed to all safekeepers. We must always start streaming from the
* beginning of the record, which simplifies decoding on the far end.
*
* Advanced truncateLsn should be not further than nearest commitLsn.
* This prevents surprising violation of truncateLsn <= commitLsn
* invariant which might occur because 1) truncateLsn can be advanced
* immediately once chunk is broadcast to all safekeepers, and
* commitLsn generally can't be advanced based on feedback from
* safekeeper who is still in the previous epoch (similar to 'leader
* can't commit entries from previous term' in Raft); 2) chunks we
* read from WAL and send are plain sheets of bytes, but safekeepers
* ack only on record boundaries.
*/
minFlushLsn = CalculateMinFlushLsn();
if (minFlushLsn > truncateLsn)
truncateLsn = minFlushLsn;

/* Cleanup message queue up to truncateLsn, but only messages received by everyone */
while (msgQueueHead != NULL && msgQueueHead->ackMask == ((1 << n_walkeepers) - 1) && msgQueueHead->req.endLsn <= truncateLsn)
Expand Down Expand Up @@ -1604,17 +1608,6 @@ AdvancePollState(int i, uint32 events)
if (minQuorumLsn > lastSentCommitLsn)
{
BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn));

/*
* commitLsn is always the record boundary; remember
* it so we can advance truncateLsn there. But do so
* only if previous value is applied, otherwise it
* might never catch up.
*/
if (candidateTruncateLsn == InvalidXLogRecPtr)
{
candidateTruncateLsn = minQuorumLsn;
}
lastSentCommitLsn = minQuorumLsn;
}
break;
Expand Down

0 comments on commit 4bd4f68

Please sign in to comment.