Skip to content

Commit 78e0afb

Browse files
committed
Store walproposer WAL on disk
1 parent 31dc24a commit 78e0afb

File tree

4 files changed

+240
-144
lines changed

4 files changed

+240
-144
lines changed

src/backend/replication/walproposer.c

Lines changed: 65 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,9 @@
3737

3838
#include <signal.h>
3939
#include <unistd.h>
40+
#include <sys/stat.h>
4041
#include "access/xlogdefs.h"
42+
#include "access/xlogutils.h"
4143
#include "replication/walproposer.h"
4244
#include "storage/latch.h"
4345
#include "miscadmin.h"
@@ -136,8 +138,7 @@ static void WalProposerStartStreaming(XLogRecPtr startpos);
136138
static void StartStreaming(Safekeeper *sk);
137139
static void SendMessageToNode(Safekeeper *sk, WalMessage *msg);
138140
static void BroadcastMessage(WalMessage *msg);
139-
static WalMessage * CreateMessage(XLogRecPtr startpos, char *data, int len);
140-
static WalMessage * CreateMessageCommitLsnOnly(XLogRecPtr lsn);
141+
static WalMessage * CreateMessage(XLogRecPtr startpos, XLogRecPtr endpos);
141142
static void HandleActiveState(Safekeeper *sk, uint32 events);
142143
static bool SendAppendRequests(Safekeeper *sk);
143144
static bool RecvAppendResponses(Safekeeper *sk);
@@ -198,7 +199,10 @@ WalProposerMain(Datum main_arg)
198199
void
199200
WalProposerSync(int argc, char *argv[])
200201
{
202+
struct stat stat_buf;
203+
201204
syncSafekeepers = true;
205+
ThisTimeLineID = 1;
202206

203207
InitStandaloneProcess(argv[0]);
204208

@@ -233,6 +237,22 @@ WalProposerSync(int argc, char *argv[])
233237
(errcode_for_socket_access(),
234238
errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m")));
235239

240+
ChangeToDataDir();
241+
242+
/* Create pg_wal directory, if it doesn't exist */
243+
if (stat(XLOGDIR, &stat_buf) != 0)
244+
{
245+
ereport(LOG, (errmsg("creating missing WAL directory \"%s\"", XLOGDIR)));
246+
if (MakePGDirectory(XLOGDIR) < 0)
247+
{
248+
ereport(ERROR,
249+
(errcode_for_file_access(),
250+
errmsg("could not create directory \"%s\": %m",
251+
XLOGDIR)));
252+
exit(1);
253+
}
254+
}
255+
236256
WalProposerInit(0, 0);
237257

238258
process_shared_preload_libraries_in_progress = false;
@@ -247,9 +267,9 @@ WalProposerSync(int argc, char *argv[])
247267
* called from walsender every time the new WAL is available.
248268
*/
249269
void
250-
WalProposerBroadcast(XLogRecPtr startpos, char *data, int len)
270+
WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos)
251271
{
252-
WalMessage *msg = CreateMessage(startpos, data, len);
272+
WalMessage *msg = CreateMessage(startpos, endpos);
253273

254274
if (msg != NULL)
255275
BroadcastMessage(msg);
@@ -305,7 +325,7 @@ WalProposerPoll(void)
305325
*/
306326
if (lastSentLsn != InvalidXLogRecPtr)
307327
{
308-
BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn));
328+
BroadcastMessage(CreateMessage(lastSentLsn, lastSentLsn));
309329
}
310330
}
311331
}
@@ -379,6 +399,9 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId)
379399
*/
380400
safekeeper[n_safekeepers].conninfo[0] = '\0';
381401
initStringInfo(&safekeeper[n_safekeepers].outbuf);
402+
safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open, .segment_close = wal_segment_close), NULL);
403+
if (safekeeper[n_safekeepers].xlogreader == NULL)
404+
elog(FATAL, "Failed to allocate xlog reader");
382405
safekeeper[n_safekeepers].flushWrite = false;
383406
safekeeper[n_safekeepers].currMsg = NULL;
384407
safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr;
@@ -1095,7 +1118,7 @@ HandleElectedProposer(void)
10951118
* sync-safeekepers who doesn't generate any real new
10961119
* records. Will go away once we switch to async acks.
10971120
*/
1098-
BroadcastMessage(CreateMessageCommitLsnOnly(propEpochStartLsn));
1121+
BroadcastMessage(CreateMessage(propEpochStartLsn, propEpochStartLsn));
10991122

11001123
/* keep polling until all safekeepers are synced */
11011124
return;
@@ -1251,7 +1274,13 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec
12511274
sizeof rec_start_lsn);
12521275
rec_start_lsn = pg_ntoh64(rec_start_lsn);
12531276
rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE;
1254-
(void) CreateMessage(rec_start_lsn, buf, len);
1277+
1278+
/* write WAL to disk */
1279+
XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn);
1280+
1281+
/* add AppendRequest to the queue */
1282+
(void) CreateMessage(rec_start_lsn, rec_end_lsn);
1283+
12551284
ereport(DEBUG1,
12561285
(errmsg("Recover message %X/%X length %d",
12571286
LSN_FORMAT_ARGS(rec_start_lsn), len)));
@@ -1477,85 +1506,39 @@ BroadcastMessage(WalMessage *msg)
14771506
}
14781507

14791508
static WalMessage *
1480-
CreateMessage(XLogRecPtr startpos, char *data, int len)
1509+
CreateMessage(XLogRecPtr startpos, XLogRecPtr endpos)
14811510
{
14821511
/* Create new message and append it to message queue */
14831512
WalMessage *msg;
1484-
XLogRecPtr endpos;
14851513

1486-
len -= XLOG_HDR_SIZE;
1487-
endpos = startpos + len;
1488-
if (msgQueueTail && msgQueueTail->req.endLsn >= endpos)
1514+
Assert(endpos >= startpos);
1515+
1516+
if (msgQueueTail && msgQueueTail->req.endLsn > endpos)
14891517
{
14901518
/* Message already queued */
14911519
return NULL;
14921520
}
1493-
Assert(len >= 0);
1494-
msg = (WalMessage *) malloc(sizeof(WalMessage) + len);
1521+
1522+
msg = (WalMessage *) malloc(sizeof(WalMessage));
14951523
if (msgQueueTail != NULL)
14961524
msgQueueTail->next = msg;
14971525
else
14981526
msgQueueHead = msg;
14991527
msgQueueTail = msg;
15001528

1501-
msg->size = sizeof(AppendRequestHeader) + len;
15021529
msg->next = NULL;
15031530
msg->req.tag = 'a';
15041531
msg->req.term = propTerm;
15051532
msg->req.epochStartLsn = propEpochStartLsn;
15061533
msg->req.beginLsn = startpos;
15071534
msg->req.endLsn = endpos;
15081535
msg->req.proposerId = greetRequest.proposerId;
1509-
memcpy(&msg->req + 1, data + XLOG_HDR_SIZE, len);
15101536

15111537
Assert(msg->req.endLsn >= lastSentLsn);
15121538
lastSentLsn = msg->req.endLsn;
15131539
return msg;
15141540
}
15151541

1516-
/*
1517-
* Create WAL message with no data, just to let the safekeepers
1518-
* know that commit lsn has advanced.
1519-
*/
1520-
static WalMessage *
1521-
CreateMessageCommitLsnOnly(XLogRecPtr lsn)
1522-
{
1523-
/* Create new message and append it to message queue */
1524-
WalMessage *msg;
1525-
1526-
msg = (WalMessage *) malloc(sizeof(WalMessage));
1527-
if (msgQueueTail != NULL)
1528-
msgQueueTail->next = msg;
1529-
else
1530-
msgQueueHead = msg;
1531-
msgQueueTail = msg;
1532-
1533-
msg->size = sizeof(AppendRequestHeader);
1534-
msg->next = NULL;
1535-
msg->req.tag = 'a';
1536-
msg->req.term = propTerm;
1537-
msg->req.epochStartLsn = propEpochStartLsn;
1538-
1539-
/*
1540-
* This serves two purposes: 1) After all msgs from previous epochs are
1541-
* pushed we queue empty WalMessage with lsn set to epochStartLsn which
1542-
* commands to switch the epoch, which allows to do the switch without
1543-
* creating new epoch records (we especially want to avoid such in --sync
1544-
* mode). Walproposer can advance commit_lsn only after the switch, so
1545-
* this lsn (reported back) also is the first possible advancement point.
1546-
* 2) Maintain common invariant of queue entries sorted by LSN.
1547-
*/
1548-
msg->req.beginLsn = lsn;
1549-
msg->req.endLsn = lsn;
1550-
msg->req.proposerId = greetRequest.proposerId;
1551-
1552-
/*
1553-
* truncateLsn and commitLsn are set just before the message sent, in
1554-
* SendAppendRequests()
1555-
*/
1556-
return msg;
1557-
}
1558-
15591542
/*
15601543
* Process all events happened in SS_ACTIVE state, update event set after that.
15611544
*/
@@ -1600,6 +1583,7 @@ SendAppendRequests(Safekeeper *sk)
16001583
WalMessage *msg;
16011584
AppendRequestHeader *req;
16021585
PGAsyncWriteResult writeResult;
1586+
WALReadError errinfo;
16031587

16041588
if (sk->flushWrite)
16051589
{
@@ -1629,20 +1613,11 @@ SendAppendRequests(Safekeeper *sk)
16291613
*/
16301614
if (sk->startStreamingAt > msg->req.beginLsn)
16311615
{
1632-
uint32 len;
1633-
uint32 size;
1634-
16351616
Assert(sk->startStreamingAt < req->endLsn);
16361617

1637-
len = msg->req.endLsn - sk->startStreamingAt;
1638-
size = sizeof(AppendRequestHeader) + len;
1639-
req = malloc(size);
1618+
req = (AppendRequestHeader *) malloc(sizeof(AppendRequestHeader));
16401619
*req = msg->req;
16411620
req->beginLsn = sk->startStreamingAt;
1642-
memcpy(req + 1,
1643-
(char *) (&msg->req + 1) + sk->startStreamingAt -
1644-
msg->req.beginLsn,
1645-
len);
16461621
}
16471622

16481623
ereport(DEBUG2,
@@ -1653,12 +1628,25 @@ SendAppendRequests(Safekeeper *sk)
16531628
LSN_FORMAT_ARGS(req->commitLsn),
16541629
LSN_FORMAT_ARGS(truncateLsn), sk->host, sk->port)));
16551630

1656-
/*
1657-
* We write with msg->size here because the body of the
1658-
* message is stored after the end of the WalMessage
1659-
* struct, in the allocation for each msg
1660-
*/
1661-
writeResult = walprop_async_write(sk->conn, req, sizeof(AppendRequestHeader) + req->endLsn - req->beginLsn);
1631+
resetStringInfo(&sk->outbuf);
1632+
1633+
/* write AppendRequest header */
1634+
appendBinaryStringInfo(&sk->outbuf, (char*) req, sizeof(AppendRequestHeader));
1635+
1636+
/* write the WAL itself */
1637+
enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn);
1638+
if (!WALRead(sk->xlogreader,
1639+
&sk->outbuf.data[sk->outbuf.len],
1640+
req->beginLsn,
1641+
req->endLsn - req->beginLsn,
1642+
ThisTimeLineID,
1643+
&errinfo))
1644+
{
1645+
WALReadRaiseError(&errinfo);
1646+
}
1647+
sk->outbuf.len += req->endLsn - req->beginLsn;
1648+
1649+
writeResult = walprop_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len);
16621650

16631651
/* Free up resources */
16641652
if (req != &msg->req)
@@ -1738,7 +1726,7 @@ RecvAppendResponses(Safekeeper *sk)
17381726
minQuorumLsn = GetAcknowledgedByQuorumWALPosition();
17391727
if (minQuorumLsn > lastSentCommitLsn)
17401728
{
1741-
BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn));
1729+
BroadcastMessage(CreateMessage(lastSentLsn, lastSentLsn));
17421730
lastSentCommitLsn = minQuorumLsn;
17431731
}
17441732

@@ -2042,7 +2030,7 @@ HandleSafekeeperResponse(void)
20422030
WalMessage *msg = msgQueueHead;
20432031
msgQueueHead = msg->next;
20442032

2045-
memset(msg, 0xDF, sizeof(WalMessage) + msg->size - sizeof(AppendRequestHeader));
2033+
memset(msg, 0xDF, sizeof(WalMessage));
20462034
free(msg);
20472035
}
20482036
if (!msgQueueHead) /* queue is empty */

src/backend/replication/walproposer_utils.c

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,15 @@
88
#include <netinet/tcp.h>
99
#include <unistd.h>
1010

11+
/*
12+
* These variables are used similarly to openLogFile/SegNo,
13+
* but for walproposer to write the XLOG during recovery. recvFileTLI is the TimeLineID
14+
* corresponding the filename of recvFile.
15+
*/
16+
static int recvFile = -1;
17+
static TimeLineID recvFileTLI = 0;
18+
static XLogSegNo recvSegNo = 0;
19+
1120
int
1221
CompareLsn(const void *a, const void *b)
1322
{
@@ -294,3 +303,100 @@ pq_sendint64_le(StringInfo buf, uint64 i)
294303
memcpy(buf->data + buf->len, &i, sizeof(uint64));
295304
buf->len += sizeof(uint64);
296305
}
306+
307+
/*
308+
* Write XLOG data to disk.
309+
*/
310+
void
311+
XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr)
312+
{
313+
int startoff;
314+
int byteswritten;
315+
316+
while (nbytes > 0)
317+
{
318+
int segbytes;
319+
320+
/* Close the current segment if it's completed */
321+
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
322+
XLogWalPropClose(recptr);
323+
324+
if (recvFile < 0)
325+
{
326+
bool use_existent = true;
327+
328+
/* Create/use new log file */
329+
XLByteToSeg(recptr, recvSegNo, wal_segment_size);
330+
recvFile = XLogFileInit(recvSegNo, &use_existent, false);
331+
recvFileTLI = ThisTimeLineID;
332+
}
333+
334+
/* Calculate the start offset of the received logs */
335+
startoff = XLogSegmentOffset(recptr, wal_segment_size);
336+
337+
if (startoff + nbytes > wal_segment_size)
338+
segbytes = wal_segment_size - startoff;
339+
else
340+
segbytes = nbytes;
341+
342+
/* OK to write the logs */
343+
errno = 0;
344+
345+
byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
346+
if (byteswritten <= 0)
347+
{
348+
char xlogfname[MAXFNAMELEN];
349+
int save_errno;
350+
351+
/* if write didn't set errno, assume no disk space */
352+
if (errno == 0)
353+
errno = ENOSPC;
354+
355+
save_errno = errno;
356+
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
357+
errno = save_errno;
358+
ereport(PANIC,
359+
(errcode_for_file_access(),
360+
errmsg("could not write to log segment %s "
361+
"at offset %u, length %lu: %m",
362+
xlogfname, startoff, (unsigned long) segbytes)));
363+
}
364+
365+
/* Update state for write */
366+
recptr += byteswritten;
367+
368+
nbytes -= byteswritten;
369+
buf += byteswritten;
370+
}
371+
372+
/*
373+
* Close the current segment if it's fully written up in the last cycle of
374+
* the loop.
375+
*/
376+
if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
377+
{
378+
XLogWalPropClose(recptr);
379+
}
380+
}
381+
382+
/*
383+
* Close the current segment.
384+
*/
385+
void
386+
XLogWalPropClose(XLogRecPtr recptr)
387+
{
388+
Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
389+
390+
if (close(recvFile) != 0)
391+
{
392+
char xlogfname[MAXFNAMELEN];
393+
XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
394+
395+
ereport(PANIC,
396+
(errcode_for_file_access(),
397+
errmsg("could not close log segment %s: %m",
398+
xlogfname)));
399+
}
400+
401+
recvFile = -1;
402+
}

0 commit comments

Comments
 (0)