diff --git a/src/backend/replication/walproposer.c b/src/backend/replication/walproposer.c index dfc4a538918..637dd22a9fe 100644 --- a/src/backend/replication/walproposer.c +++ b/src/backend/replication/walproposer.c @@ -37,7 +37,9 @@ #include #include +#include #include "access/xlogdefs.h" +#include "access/xlogutils.h" #include "replication/walproposer.h" #include "storage/latch.h" #include "miscadmin.h" @@ -73,12 +75,8 @@ WalProposerFunctionsType *WalProposerFunctions = NULL; static int n_safekeepers = 0; static int quorum = 0; static Safekeeper safekeeper[MAX_SAFEKEEPERS]; -static WalMessage *msgQueueHead; -static WalMessage *msgQueueTail; -static XLogRecPtr lastSentLsn; /* WAL has been appended to msg queue up to - * this point */ -static XLogRecPtr lastSentCommitLsn; /* last commitLsn broadcast to - * safekeepers */ +static XLogRecPtr availableLsn; /* WAL has been generated up to this point */ +static XLogRecPtr lastSentCommitLsn; /* last commitLsn broadcast to safekeepers */ static ProposerGreeting greetRequest; static VoteRequest voteRequest; /* Vote request for safekeeper */ static WaitEventSet *waitEvents; @@ -134,10 +132,8 @@ static bool WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr start static void SendProposerElected(Safekeeper *sk); static void WalProposerStartStreaming(XLogRecPtr startpos); static void StartStreaming(Safekeeper *sk); -static void SendMessageToNode(Safekeeper *sk, WalMessage *msg); -static void BroadcastMessage(WalMessage *msg); -static WalMessage * CreateMessage(XLogRecPtr startpos, char *data, int len); -static WalMessage * CreateMessageCommitLsnOnly(XLogRecPtr lsn); +static void SendMessageToNode(Safekeeper *sk); +static void BroadcastAppendRequest(void); static void HandleActiveState(Safekeeper *sk, uint32 events); static bool SendAppendRequests(Safekeeper *sk); static bool RecvAppendResponses(Safekeeper *sk); @@ -198,7 +194,10 @@ WalProposerMain(Datum main_arg) void WalProposerSync(int argc, char *argv[]) { + struct stat stat_buf; + syncSafekeepers = true; + ThisTimeLineID = 1; InitStandaloneProcess(argv[0]); @@ -233,6 +232,22 @@ WalProposerSync(int argc, char *argv[]) (errcode_for_socket_access(), errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m"))); + ChangeToDataDir(); + + /* Create pg_wal directory, if it doesn't exist */ + if (stat(XLOGDIR, &stat_buf) != 0) + { + ereport(LOG, (errmsg("creating missing WAL directory \"%s\"", XLOGDIR))); + if (MakePGDirectory(XLOGDIR) < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + XLOGDIR))); + exit(1); + } + } + WalProposerInit(0, 0); process_shared_preload_libraries_in_progress = false; @@ -247,12 +262,11 @@ WalProposerSync(int argc, char *argv[]) * called from walsender every time the new WAL is available. */ void -WalProposerBroadcast(XLogRecPtr startpos, char *data, int len) +WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos) { - WalMessage *msg = CreateMessage(startpos, data, len); - - if (msg != NULL) - BroadcastMessage(msg); + Assert(startpos == availableLsn && endpos >= availableLsn); + availableLsn = endpos; + BroadcastAppendRequest(); } /* @@ -303,9 +317,9 @@ WalProposerPoll(void) * If no WAL was generated during timeout (and we have already * collected the quorum), then send pool message */ - if (lastSentLsn != InvalidXLogRecPtr) + if (availableLsn != InvalidXLogRecPtr) { - BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn)); + BroadcastAppendRequest(); } } } @@ -379,9 +393,12 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) */ safekeeper[n_safekeepers].conninfo[0] = '\0'; initStringInfo(&safekeeper[n_safekeepers].outbuf); + safekeeper[n_safekeepers].xlogreader = XLogReaderAllocate(wal_segment_size, NULL, XL_ROUTINE(.segment_open = wal_segment_open, .segment_close = wal_segment_close), NULL); + if (safekeeper[n_safekeepers].xlogreader == NULL) + elog(FATAL, "Failed to allocate xlog reader"); safekeeper[n_safekeepers].flushWrite = false; - safekeeper[n_safekeepers].currMsg = NULL; safekeeper[n_safekeepers].startStreamingAt = InvalidXLogRecPtr; + safekeeper[n_safekeepers].streamingAt = InvalidXLogRecPtr; n_safekeepers += 1; } if (n_safekeepers < 1) @@ -513,7 +530,7 @@ ShutdownConnection(Safekeeper *sk) sk->conn = NULL; sk->state = SS_OFFLINE; sk->flushWrite = false; - sk->currMsg = NULL; + sk->streamingAt = InvalidXLogRecPtr; if (sk->voteResponse.termHistory.entries) pfree(sk->voteResponse.termHistory.entries); @@ -1087,13 +1104,13 @@ HandleElectedProposer(void) if (syncSafekeepers) { /* - * Queue empty message to enforce receiving feedback - * even from nodes who are fully recovered; this is - * required to learn they switched epoch which finishes - * sync-safeekepers who doesn't generate any real new - * records. Will go away once we switch to async acks. - */ - BroadcastMessage(CreateMessageCommitLsnOnly(propEpochStartLsn)); + * Send empty message to enforce receiving feedback + * even from nodes who are fully recovered; this is + * required to learn they switched epoch which finishes + * sync-safeekepers who doesn't generate any real new + * records. Will go away once we switch to async acks. + */ + BroadcastAppendRequest(); /* keep polling until all safekeepers are synced */ return; @@ -1172,6 +1189,12 @@ DetermineEpochStartLsn(void) Assert((truncateLsn != InvalidXLogRecPtr) || (syncSafekeepers && truncateLsn == propEpochStartLsn)); + /* + * We will be generating WAL since propEpochStartLsn, so we should set + * availableLsn to mark this LSN as the latest available position. + */ + availableLsn = propEpochStartLsn; + /* * Proposer's term history is the donor's + its own entry. */ @@ -1249,7 +1272,10 @@ WalProposerRecovery(int donor, TimeLineID timeline, XLogRecPtr startpos, XLogRec sizeof rec_start_lsn); rec_start_lsn = pg_ntoh64(rec_start_lsn); rec_end_lsn = rec_start_lsn + len - XLOG_HDR_SIZE; - (void) CreateMessage(rec_start_lsn, buf, len); + + /* write WAL to disk */ + XLogWalPropWrite(&buf[XLOG_HDR_SIZE], len - XLOG_HDR_SIZE, rec_start_lsn); + ereport(DEBUG1, (errmsg("Recover message %X/%X length %d", LSN_FORMAT_ARGS(rec_start_lsn), len))); @@ -1370,7 +1396,7 @@ SendProposerElected(Safekeeper *sk) } } - Assert(msgQueueHead == NULL || sk->startStreamingAt >= msgQueueHead->req.beginLsn); + Assert(sk->startStreamingAt >= truncateLsn && sk->startStreamingAt <= availableLsn); msg.tag = 'e'; msg.term = propTerm; @@ -1422,39 +1448,29 @@ WalProposerStartStreaming(XLogRecPtr startpos) static void StartStreaming(Safekeeper *sk) { - WalMessage *startMsg = msgQueueHead; - /* * This is the only entrypoint to state SS_ACTIVE. It's executed * exactly once for a connection. */ sk->state = SS_ACTIVE; - - while (startMsg != NULL && startMsg->req.endLsn <= sk->startStreamingAt) - startMsg = startMsg->next; - - /* We should always have WAL to start from sk->startStreamingAt */ - Assert(startMsg == NULL || startMsg->req.beginLsn <= sk->startStreamingAt); + sk->streamingAt = sk->startStreamingAt; /* event set will be updated inside SendMessageToNode */ - SendMessageToNode(sk, startMsg); + SendMessageToNode(sk); } /* - * Start sending message to the particular node. Always updates event set. + * Try to send message to the particular node. Always updates event set. Will + * send at least one message, if socket is ready. * * Can be used only for safekeepers in SS_ACTIVE state. State can be changed * in case of errors. */ static void -SendMessageToNode(Safekeeper *sk, WalMessage *msg) +SendMessageToNode(Safekeeper *sk) { - /* we shouldn't be already sending something */ - Assert(sk->currMsg == NULL); Assert(sk->state == SS_ACTIVE); - sk->currMsg = msg; - /* Note: we always send everything to the safekeeper until WOULDBLOCK or nothing left to send */ HandleActiveState(sk, WL_SOCKET_WRITEABLE); } @@ -1463,95 +1479,25 @@ SendMessageToNode(Safekeeper *sk, WalMessage *msg) * Broadcast new message to all caught-up safekeepers */ static void -BroadcastMessage(WalMessage *msg) +BroadcastAppendRequest() { for (int i = 0; i < n_safekeepers; i++) - { - if (safekeeper[i].state == SS_ACTIVE && safekeeper[i].currMsg == NULL) - { - SendMessageToNode(&safekeeper[i], msg); - } - } -} - -static WalMessage * -CreateMessage(XLogRecPtr startpos, char *data, int len) -{ - /* Create new message and append it to message queue */ - WalMessage *msg; - XLogRecPtr endpos; - - len -= XLOG_HDR_SIZE; - endpos = startpos + len; - if (msgQueueTail && msgQueueTail->req.endLsn >= endpos) - { - /* Message already queued */ - return NULL; - } - Assert(len >= 0); - msg = (WalMessage *) malloc(sizeof(WalMessage) + len); - if (msgQueueTail != NULL) - msgQueueTail->next = msg; - else - msgQueueHead = msg; - msgQueueTail = msg; - - msg->size = sizeof(AppendRequestHeader) + len; - msg->next = NULL; - msg->req.tag = 'a'; - msg->req.term = propTerm; - msg->req.epochStartLsn = propEpochStartLsn; - msg->req.beginLsn = startpos; - msg->req.endLsn = endpos; - msg->req.proposerId = greetRequest.proposerId; - memcpy(&msg->req + 1, data + XLOG_HDR_SIZE, len); - - Assert(msg->req.endLsn >= lastSentLsn); - lastSentLsn = msg->req.endLsn; - return msg; + if (safekeeper[i].state == SS_ACTIVE) + SendMessageToNode(&safekeeper[i]); } -/* - * Create WAL message with no data, just to let the safekeepers - * know that commit lsn has advanced. - */ -static WalMessage * -CreateMessageCommitLsnOnly(XLogRecPtr lsn) +static void +PrepareAppendRequest(AppendRequestHeader *req, XLogRecPtr beginLsn, XLogRecPtr endLsn) { - /* Create new message and append it to message queue */ - WalMessage *msg; - - msg = (WalMessage *) malloc(sizeof(WalMessage)); - if (msgQueueTail != NULL) - msgQueueTail->next = msg; - else - msgQueueHead = msg; - msgQueueTail = msg; - - msg->size = sizeof(AppendRequestHeader); - msg->next = NULL; - msg->req.tag = 'a'; - msg->req.term = propTerm; - msg->req.epochStartLsn = propEpochStartLsn; - - /* - * This serves two purposes: 1) After all msgs from previous epochs are - * pushed we queue empty WalMessage with lsn set to epochStartLsn which - * commands to switch the epoch, which allows to do the switch without - * creating new epoch records (we especially want to avoid such in --sync - * mode). Walproposer can advance commit_lsn only after the switch, so - * this lsn (reported back) also is the first possible advancement point. - * 2) Maintain common invariant of queue entries sorted by LSN. - */ - msg->req.beginLsn = lsn; - msg->req.endLsn = lsn; - msg->req.proposerId = greetRequest.proposerId; - - /* - * truncateLsn and commitLsn are set just before the message sent, in - * SendAppendRequests() - */ - return msg; + Assert(endLsn >= beginLsn); + req->tag = 'a'; + req->term = propTerm; + req->epochStartLsn = propEpochStartLsn; + req->beginLsn = beginLsn; + req->endLsn = endLsn; + req->commitLsn = GetAcknowledgedByQuorumWALPosition(); + req->truncateLsn = truncateLsn; + req->proposerId = greetRequest.proposerId; } /* @@ -1574,20 +1520,22 @@ HandleActiveState(Safekeeper *sk, uint32 events) * We should wait for WL_SOCKET_WRITEABLE event if we have unflushed data * in the buffer. * - * sk->currMsg checks if we have pending unsent messages. This check isn't - * necessary now, because we always send queue messages immediately after - * creation. But it's good to have it here in case we change this behavior + * LSN comparison checks if we have pending unsent messages. This check isn't + * necessary now, because we always send append messages immediately after + * arrival. But it's good to have it here in case we change this behavior * in the future. */ - if (sk->currMsg != NULL || sk->flushWrite) + if (sk->streamingAt != availableLsn || sk->flushWrite) newEvents |= WL_SOCKET_WRITEABLE; UpdateEventSet(sk, newEvents); } /* - * Send queue messages starting from sk->currMsg until the end or non-writable + * Send WAL messages starting from sk->streamingAt until the end or non-writable * socket, whichever comes first. Caller should take care of updating event set. + * Even if no unsent WAL is available, at least one empty message will be sent + * as a heartbeat, if socket is ready. * * Can change state if Async* functions encounter errors and reset connection. * Returns false in this case, true otherwise. @@ -1595,9 +1543,11 @@ HandleActiveState(Safekeeper *sk, uint32 events) static bool SendAppendRequests(Safekeeper *sk) { - WalMessage *msg; + XLogRecPtr endLsn; AppendRequestHeader *req; PGAsyncWriteResult writeResult; + WALReadError errinfo; + bool sentAnything = false; if (sk->flushWrite) { @@ -1612,37 +1562,21 @@ SendAppendRequests(Safekeeper *sk) sk->flushWrite = false; } - while (sk->currMsg) + while (sk->streamingAt != availableLsn || !sentAnything) { - msg = sk->currMsg; - req = &msg->req; + sentAnything = true; - req->commitLsn = GetAcknowledgedByQuorumWALPosition(); - req->truncateLsn = truncateLsn; + endLsn = sk->streamingAt; + endLsn += MAX_SEND_SIZE; - /* - * If we need to send this message not from the beginning, - * form the cut version. Only happens for the first - * message. - */ - if (sk->startStreamingAt > msg->req.beginLsn) - { - uint32 len; - uint32 size; - - Assert(sk->startStreamingAt < req->endLsn); - - len = msg->req.endLsn - sk->startStreamingAt; - size = sizeof(AppendRequestHeader) + len; - req = malloc(size); - *req = msg->req; - req->beginLsn = sk->startStreamingAt; - memcpy(req + 1, - (char *) (&msg->req + 1) + sk->startStreamingAt - - msg->req.beginLsn, - len); + /* if we went beyond available WAL, back off */ + if (endLsn > availableLsn) { + endLsn = availableLsn; } + req = &sk->appendRequest; + PrepareAppendRequest(&sk->appendRequest, sk->streamingAt, endLsn); + ereport(DEBUG2, (errmsg("sending message len %ld beginLsn=%X/%X endLsn=%X/%X commitLsn=%X/%X truncateLsn=%X/%X to %s:%s", req->endLsn - req->beginLsn, @@ -1651,19 +1585,28 @@ SendAppendRequests(Safekeeper *sk) LSN_FORMAT_ARGS(req->commitLsn), LSN_FORMAT_ARGS(truncateLsn), sk->host, sk->port))); - /* - * We write with msg->size here because the body of the - * message is stored after the end of the WalMessage - * struct, in the allocation for each msg - */ - writeResult = walprop_async_write(sk->conn, req, sizeof(AppendRequestHeader) + req->endLsn - req->beginLsn); - - /* Free up resources */ - if (req != &msg->req) - free(req); + resetStringInfo(&sk->outbuf); + /* write AppendRequest header */ + appendBinaryStringInfo(&sk->outbuf, (char*) req, sizeof(AppendRequestHeader)); + + /* write the WAL itself */ + enlargeStringInfo(&sk->outbuf, req->endLsn - req->beginLsn); + if (!WALRead(sk->xlogreader, + &sk->outbuf.data[sk->outbuf.len], + req->beginLsn, + req->endLsn - req->beginLsn, + ThisTimeLineID, + &errinfo)) + { + WALReadRaiseError(&errinfo); + } + sk->outbuf.len += req->endLsn - req->beginLsn; + + writeResult = walprop_async_write(sk->conn, sk->outbuf.data, sk->outbuf.len); + /* Mark current message as sent, whatever the result is */ - sk->currMsg = sk->currMsg->next; + sk->streamingAt = endLsn; switch (writeResult) { @@ -1719,6 +1662,13 @@ RecvAppendResponses(Safekeeper *sk) if (!AsyncReadMessage(sk, (AcceptorProposerMessage *) &sk->appendResponse)) break; + ereport(DEBUG2, + (errmsg("received message term=" INT64_FORMAT " flushLsn=%X/%X commitLsn=%X/%X from %s:%s", + sk->appendResponse.term, + LSN_FORMAT_ARGS(sk->appendResponse.flushLsn), + LSN_FORMAT_ARGS(sk->appendResponse.commitLsn), + sk->host, sk->port))); + readAnything = true; } @@ -1729,14 +1679,11 @@ RecvAppendResponses(Safekeeper *sk) /* * Also send the new commit lsn to all the safekeepers. - * - * FIXME: This is redundant for safekeepers that have other - * outbound messages pending. */ minQuorumLsn = GetAcknowledgedByQuorumWALPosition(); if (minQuorumLsn > lastSentCommitLsn) { - BroadcastMessage(CreateMessageCommitLsnOnly(lastSentLsn)); + BroadcastAppendRequest(); lastSentCommitLsn = minQuorumLsn; } @@ -2029,25 +1976,16 @@ HandleSafekeeperResponse(void) */ minFlushLsn = CalculateMinFlushLsn(); if (minFlushLsn > truncateLsn) - truncateLsn = minFlushLsn; - - /* - * Cleanup message queue up to truncateLsn. These messages were processed - * by all safekeepers because they all reported flushLsn greater than endLsn. - */ - while (msgQueueHead != NULL && msgQueueHead->req.endLsn < truncateLsn) { - WalMessage *msg = msgQueueHead; - msgQueueHead = msg->next; + truncateLsn = minFlushLsn; - memset(msg, 0xDF, sizeof(WalMessage) + msg->size - sizeof(AppendRequestHeader)); - free(msg); + /* + * Advance the replication slot to free up old WAL files. Note + * that slot doesn't exist if we are in syncSafekeepers mode. + */ + if (MyReplicationSlot) + PhysicalConfirmReceivedLocation(truncateLsn); } - if (!msgQueueHead) /* queue is empty */ - msgQueueTail = NULL; - - /* truncateLsn always points to the first chunk in the queue */ - Assert(msgQueueHead == NULL || (truncateLsn >= msgQueueHead->req.beginLsn && truncateLsn <= msgQueueHead->req.endLsn)); /* * Generally sync is done when majority switched the epoch so we committed diff --git a/src/backend/replication/walproposer_utils.c b/src/backend/replication/walproposer_utils.c index 7a593a71778..c9ddafdee0c 100644 --- a/src/backend/replication/walproposer_utils.c +++ b/src/backend/replication/walproposer_utils.c @@ -8,6 +8,15 @@ #include #include +/* + * These variables are used similarly to openLogFile/SegNo, + * but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID + * corresponding the filename of walpropFile. + */ +static int walpropFile = -1; +static TimeLineID walpropFileTLI = 0; +static XLogSegNo walpropSegNo = 0; + int CompareLsn(const void *a, const void *b) { @@ -294,3 +303,100 @@ pq_sendint64_le(StringInfo buf, uint64 i) memcpy(buf->data + buf->len, &i, sizeof(uint64)); buf->len += sizeof(uint64); } + +/* + * Write XLOG data to disk. + */ +void +XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + /* Close the current segment if it's completed */ + if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) + XLogWalPropClose(recptr); + + if (walpropFile < 0) + { + bool use_existent = true; + + /* Create/use new log file */ + XLByteToSeg(recptr, walpropSegNo, wal_segment_size); + walpropFile = XLogFileInit(walpropSegNo, &use_existent, false); + walpropFileTLI = ThisTimeLineID; + } + + /* Calculate the start offset of the received logs */ + startoff = XLogSegmentOffset(recptr, wal_segment_size); + + if (startoff + nbytes > wal_segment_size) + segbytes = wal_segment_size - startoff; + else + segbytes = nbytes; + + /* OK to write the logs */ + errno = 0; + + byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff); + if (byteswritten <= 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno; + + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + + save_errno = errno; + XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + xlogfname, startoff, (unsigned long) segbytes))); + } + + /* Update state for write */ + recptr += byteswritten; + + nbytes -= byteswritten; + buf += byteswritten; + } + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop. + */ + if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) + { + XLogWalPropClose(recptr); + } +} + +/* + * Close the current segment. + */ +void +XLogWalPropClose(XLogRecPtr recptr) +{ + Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)); + + if (close(walpropFile) != 0) + { + char xlogfname[MAXFNAMELEN]; + XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); + + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + } + + walpropFile = -1; +} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index f649302ba9e..e46870e70f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -1864,7 +1864,7 @@ ProcessStandbyMessage(void) /* * Remember that a walreceiver just confirmed receipt of lsn `lsn`. */ -static void +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn) { bool changed = false; @@ -2030,6 +2030,13 @@ ProcessStandbyReply(XLogRecPtr writePtr, if (!am_cascading_walsender) SyncRepReleaseWaiters(); + /* + * walproposer use trunclateLsn instead of flushPtr for confirmed + * received location, so we shouldn't update restart_lsn here. + */ + if (am_wal_proposer) + return; + /* * Advance our local xmin horizon when the client confirmed a flush. */ @@ -2830,73 +2837,73 @@ XLogSendPhysical(void) nbytes = endptr - startptr; Assert(nbytes <= MAX_SEND_SIZE); - /* - * OK to read and send the slice. - */ - if (output_message.data) - resetStringInfo(&output_message); + if (am_wal_proposer) + { + WalProposerBroadcast(startptr, endptr); + } else - initStringInfo(&output_message); - - pq_sendbyte(&output_message, 'w'); - pq_sendint64(&output_message, startptr); /* dataStart */ - pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ - pq_sendint64(&output_message, 0); /* sendtime, filled in last */ - - /* - * Read the log directly into the output buffer to avoid extra memcpy - * calls. - */ - enlargeStringInfo(&output_message, nbytes); + { + /* + * OK to read and send the slice. + */ + if (output_message.data) + resetStringInfo(&output_message); + else + initStringInfo(&output_message); -retry: - if (!WALRead(xlogreader, - &output_message.data[output_message.len], - startptr, - nbytes, - xlogreader->seg.ws_tli, /* Pass the current TLI because - * only WalSndSegmentOpen controls - * whether new TLI is needed. */ - &errinfo)) - WALReadRaiseError(&errinfo); + pq_sendbyte(&output_message, 'w'); + pq_sendint64(&output_message, startptr); /* dataStart */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ - /* See logical_read_xlog_page(). */ - XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); - CheckXLogRemoved(segno, xlogreader->seg.ws_tli); + /* + * Read the log directly into the output buffer to avoid extra memcpy + * calls. + */ + enlargeStringInfo(&output_message, nbytes); + + retry: + if (!WALRead(xlogreader, + &output_message.data[output_message.len], + startptr, + nbytes, + xlogreader->seg.ws_tli, /* Pass the current TLI because + * only WalSndSegmentOpen controls + * whether new TLI is needed. */ + &errinfo)) + WALReadRaiseError(&errinfo); + + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); + CheckXLogRemoved(segno, xlogreader->seg.ws_tli); - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); - if (reload && xlogreader->seg.ws_file >= 0) - { - wal_segment_close(xlogreader); + if (reload && xlogreader->seg.ws_file >= 0) + { + wal_segment_close(xlogreader); - goto retry; + goto retry; + } } - } - output_message.len += nbytes; - output_message.data[output_message.len] = '\0'; + output_message.len += nbytes; + output_message.data[output_message.len] = '\0'; - if (am_wal_proposer) - { - WalProposerBroadcast(startptr, output_message.data, output_message.len); - } - else - { /* * Fill the send timestamp last, so that it is taken as late as possible. */ diff --git a/src/include/replication/walproposer.h b/src/include/replication/walproposer.h index 159af4f4bdc..538dcf6c5b6 100644 --- a/src/include/replication/walproposer.h +++ b/src/include/replication/walproposer.h @@ -14,6 +14,7 @@ #define SK_PROTOCOL_VERSION 1 #define MAX_SAFEKEEPERS 32 +#define MAX_SEND_SIZE (XLOG_BLCKSZ * 16) /* max size of a single WAL message */ #define XLOG_HDR_SIZE (1+8*3) /* 'w' + startPos + walEnd + timestamp */ #define XLOG_HDR_START_POS 1 /* offset of start position in wal sender message header */ #define XLOG_HDR_END_POS (1+8) /* offset of end position in wal sender message header */ @@ -251,23 +252,6 @@ typedef struct AppendRequestHeader pg_uuid_t proposerId; /* for monitoring/debugging */ } AppendRequestHeader; -/* - * All copy data message ('w') are linked in L1 send list and asynchronously sent to receivers. - * When message is sent to all receivers, it is removed from send list. - */ -struct WalMessage -{ - WalMessage* next; /* L1 list of messages */ - uint32 size; /* message size */ - AppendRequestHeader req; /* request to safekeeper (message header) */ - - /* PHANTOM FIELD: - * - * All WalMessages are allocated with exactly (size - sizeof(AppendRequestHeader)) additional bytes - * after them, containing the body of the message. This allocation is done in `CreateMessage` - * (for body len > 0) and `CreateMessageVCLOnly` (for body len == 0). */ -}; - /* * Hot standby feedback received from replica */ @@ -342,20 +326,29 @@ typedef struct Safekeeper * reach SS_ACTIVE; not before. */ WalProposerConn* conn; + /* + * Temporary buffer for the message being sent to the safekeeper. + */ StringInfoData outbuf; + /* + * WAL reader, allocated for each safekeeper. + */ + XLogReaderState* xlogreader; - bool flushWrite; /* set to true if we need to call AsyncFlush, to flush pending messages */ - WalMessage* currMsg; /* message that wasn't sent yet or NULL, if we have nothing to send */ - - int eventPos; /* position in wait event set. Equal to -1 if no event */ - SafekeeperState state; /* safekeeper state machine state */ - AcceptorGreeting greetResponse; /* acceptor greeting */ - VoteResponse voteResponse; /* the vote */ - AppendResponse appendResponse; /* feedback to master */ /* * Streaming will start here; must be record boundary. */ XLogRecPtr startStreamingAt; + + bool flushWrite; /* set to true if we need to call AsyncFlush, to flush pending messages */ + XLogRecPtr streamingAt; /* current streaming position */ + AppendRequestHeader appendRequest; /* request for sending to safekeeper */ + + int eventPos; /* position in wait event set. Equal to -1 if no event */ + SafekeeperState state; /* safekeeper state machine state */ + AcceptorGreeting greetResponse; /* acceptor greeting */ + VoteResponse voteResponse; /* the vote */ + AppendResponse appendResponse; /* feedback for master */ } Safekeeper; @@ -365,19 +358,22 @@ void AssertEventsOkForState(uint32 events, Safekeeper* sk); uint32 SafekeeperStateDesiredEvents(SafekeeperState state); char* FormatEvents(uint32 events); void WalProposerMain(Datum main_arg); -void WalProposerBroadcast(XLogRecPtr startpos, char* data, int len); +void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos); bool HexDecodeString(uint8 *result, char *input, int nbytes); uint32 pq_getmsgint32_le(StringInfo msg); uint64 pq_getmsgint64_le(StringInfo msg); -void pq_sendint32_le(StringInfo buf, uint32 i); -void pq_sendint64_le(StringInfo buf, uint64 i); +void pq_sendint32_le(StringInfo buf, uint32 i); +void pq_sendint64_le(StringInfo buf, uint64 i); void WalProposerPoll(void); void WalProposerRegister(void); +void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); +void XLogWalPropClose(XLogRecPtr recptr); void ProcessStandbyReply(XLogRecPtr writePtr, XLogRecPtr flushPtr, XLogRecPtr applyPtr, TimestampTz replyTime, bool replyRequested); +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); void ProcessStandbyHSFeedback(TimestampTz replyTime, TransactionId feedbackXmin, uint32 feedbackEpoch,