@@ -81,7 +81,6 @@ static AppendResponse lastFeedback;
81
81
* record-aligned (first record which might not yet received by someone).
82
82
*/
83
83
static XLogRecPtr truncateLsn ;
84
- static XLogRecPtr candidateTruncateLsn ;
85
84
static VoteRequest voteRequest ; /* Vote request for walkeeper */
86
85
static term_t propTerm ; /* term of the proposer */
87
86
static XLogRecPtr propEpochStartLsn ; /* epoch start lsn of the proposer */
@@ -150,6 +149,26 @@ CalculateDiskConsistentLsn(void)
150
149
return lsn ;
151
150
}
152
151
152
+ /*
153
+ * Get minimum of flushed LSNs of all safekeepers, which is the LSN of the
154
+ * last WAL record that can be safely discarded.
155
+ */
156
+ static XLogRecPtr
157
+ CalculateMinFlushLsn (void )
158
+ {
159
+ XLogRecPtr lsn = UnknownXLogRecPtr ;
160
+ for (int i = 0 ; i < n_walkeepers ; i ++ )
161
+ {
162
+ /* We can't rely on safekeeper flushLsn if it has wrong epoch */
163
+ if (walkeeper [i ].feedback .epoch != propTerm )
164
+ return 0 ;
165
+
166
+ if (walkeeper [i ].feedback .flushLsn < lsn )
167
+ lsn = walkeeper [i ].feedback .flushLsn ;
168
+ }
169
+ return lsn ;
170
+ }
171
+
153
172
/* Initializes the internal event set, provided that it is currently null */
154
173
static void
155
174
InitEventSet (void )
@@ -357,7 +376,7 @@ HandleWalKeeperResponse(void)
357
376
HotStandbyFeedback hsFeedback ;
358
377
XLogRecPtr minQuorumLsn ;
359
378
XLogRecPtr diskConsistentLsn ;
360
- WalMessage * msgQueueAck ;
379
+ XLogRecPtr minFlushLsn ;
361
380
362
381
minQuorumLsn = GetAcknowledgedByQuorumWALPosition ();
363
382
diskConsistentLsn = CalculateDiskConsistentLsn ();
@@ -389,39 +408,24 @@ HandleWalKeeperResponse(void)
389
408
EpochFromFullTransactionId (hsFeedback .catalog_xmin ));
390
409
}
391
410
392
- /* Advance truncateLsn */
393
- msgQueueAck = msgQueueHead ;
394
- while (msgQueueAck != NULL && msgQueueAck -> ackMask == ((1 << n_walkeepers ) - 1 ))
395
- {
396
- /*
397
- * This piece is received by everyone; try to advance truncateLsn, but
398
- * hold it back to nearest commitLsn. Thus we will always start
399
- * streaming from the beginning of the record, which simplifies
400
- * decoding on the far end.
401
- *
402
- * This also prevents surprising violation of truncateLsn <= commitLsn
403
- * invariant which might occur because 1) truncateLsn can be advanced
404
- * immediately once chunk is broadcast to all safekeepers, and
405
- * commitLsn generally can't be advanced based on feedback from
406
- * safekeeper who is still in the previous epoch (similar to 'leader
407
- * can't commit entries from previous term' in Raft); 2) chunks we
408
- * read from WAL and send are plain sheets of bytes, but safekeepers
409
- * ack only on commit boundaries.
410
- */
411
- if (msgQueueAck -> req .endLsn >= minQuorumLsn && minQuorumLsn != InvalidXLogRecPtr )
412
- {
413
- truncateLsn = minQuorumLsn ;
414
- candidateTruncateLsn = InvalidXLogRecPtr ;
415
- }
416
- else if (msgQueueAck -> req .endLsn >= candidateTruncateLsn &&
417
- candidateTruncateLsn != InvalidXLogRecPtr )
418
- {
419
- truncateLsn = candidateTruncateLsn ;
420
- candidateTruncateLsn = InvalidXLogRecPtr ;
421
- }
422
-
423
- msgQueueAck = msgQueueAck -> next ;
424
- }
411
+ /*
412
+ * Try to advance truncateLsn to minFlushLsn, which is the last record
413
+ * flushed to all safekeepers. We must always start streaming from the
414
+ * beginning of the record, which simplifies decoding on the far end.
415
+ *
416
+ * Advanced truncateLsn should be not further than nearest commitLsn.
417
+ * This prevents surprising violation of truncateLsn <= commitLsn
418
+ * invariant which might occur because 1) truncateLsn can be advanced
419
+ * immediately once chunk is broadcast to all safekeepers, and
420
+ * commitLsn generally can't be advanced based on feedback from
421
+ * safekeeper who is still in the previous epoch (similar to 'leader
422
+ * can't commit entries from previous term' in Raft); 2) chunks we
423
+ * read from WAL and send are plain sheets of bytes, but safekeepers
424
+ * ack only on record boundaries.
425
+ */
426
+ minFlushLsn = CalculateMinFlushLsn ();
427
+ if (minFlushLsn > truncateLsn )
428
+ truncateLsn = minFlushLsn ;
425
429
426
430
/* Cleanup message queue up to truncateLsn, but only messages received by everyone */
427
431
while (msgQueueHead != NULL && msgQueueHead -> ackMask == ((1 << n_walkeepers ) - 1 ) && msgQueueHead -> req .endLsn <= truncateLsn )
@@ -1604,17 +1608,6 @@ AdvancePollState(int i, uint32 events)
1604
1608
if (minQuorumLsn > lastSentCommitLsn )
1605
1609
{
1606
1610
BroadcastMessage (CreateMessageCommitLsnOnly (lastSentLsn ));
1607
-
1608
- /*
1609
- * commitLsn is always the record boundary; remember
1610
- * it so we can advance truncateLsn there. But do so
1611
- * only if previous value is applied, otherwise it
1612
- * might never catch up.
1613
- */
1614
- if (candidateTruncateLsn == InvalidXLogRecPtr )
1615
- {
1616
- candidateTruncateLsn = minQuorumLsn ;
1617
- }
1618
1611
lastSentCommitLsn = minQuorumLsn ;
1619
1612
}
1620
1613
break ;
0 commit comments