From 2f9a0202b583c56b1eea3cdeff85bc02c7e0b982 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Thu, 4 Aug 2022 12:18:55 +0200 Subject: [PATCH 01/12] Pull 99% of walproposer code into extension. --- contrib/neon/Makefile | 11 +- contrib/neon/libpagestore.c | 9 +- .../neon}/libpqwalproposer/Makefile | 4 +- .../neon}/libpqwalproposer/libpqwalproposer.c | 2 +- contrib/neon/neon.c | 14 +- contrib/neon/neon.h | 11 + .../neon}/walproposer.c | 154 ++- .../neon}/walproposer.h | 33 +- contrib/neon/walproposer_utils.c | 1112 +++++++++++++++++ contrib/neon/walproposer_utils.h | 19 + src/Makefile | 1 - src/backend/access/transam/xloginsert.c | 2 +- src/backend/main/main.c | 2 +- src/backend/postmaster/bgworker.c | 5 +- src/backend/postmaster/postmaster.c | 7 +- src/backend/replication/Makefile | 3 +- src/backend/replication/walpropcompat.c | 96 ++ src/backend/replication/walproposer_utils.c | 402 ------ src/backend/replication/walsender.c | 283 ++--- src/backend/storage/ipc/ipci.c | 6 +- src/backend/tcop/postgres.c | 19 +- src/backend/utils/misc/guc.c | 35 +- src/include/replication/walpropshim.h | 11 + src/include/replication/walsender.h | 21 +- 24 files changed, 1540 insertions(+), 722 deletions(-) rename {src/backend/replication => contrib/neon}/libpqwalproposer/Makefile (91%) rename {src/backend/replication => contrib/neon}/libpqwalproposer/libpqwalproposer.c (99%) create mode 100644 contrib/neon/neon.h rename {src/backend/replication => contrib/neon}/walproposer.c (95%) rename {src/include/replication => contrib/neon}/walproposer.h (92%) create mode 100644 contrib/neon/walproposer_utils.c create mode 100644 contrib/neon/walproposer_utils.h create mode 100644 src/backend/replication/walpropcompat.c delete mode 100644 src/backend/replication/walproposer_utils.c create mode 100644 src/include/replication/walpropshim.h diff --git a/contrib/neon/Makefile b/contrib/neon/Makefile index b6f3cf400ff..e81c3660a0b 100644 --- a/contrib/neon/Makefile +++ b/contrib/neon/Makefile @@ -4,7 +4,13 @@ MODULE_big = neon OBJS = \ $(WIN32RES) \ - inmem_smgr.o libpagestore.o pagestore_smgr.o relsize_cache.o neon.o + inmem_smgr.o \ + libpagestore.o \ + pagestore_smgr.o \ + relsize_cache.o \ + neon.o \ + walproposer.o \ + walproposer_utils.o PG_CPPFLAGS = -I$(libpq_srcdir) SHLIB_LINK_INTERNAL = $(libpq) @@ -13,6 +19,9 @@ EXTENSION = neon DATA = neon--1.0.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" +SUBDIRS = \ + libpqwalproposer + ifdef USE_PGXS PG_CONFIG = pg_config PGXS := $(shell $(PG_CONFIG) --pgxs) diff --git a/contrib/neon/libpagestore.c b/contrib/neon/libpagestore.c index 2621421532a..15a4a769934 100644 --- a/contrib/neon/libpagestore.c +++ b/contrib/neon/libpagestore.c @@ -26,11 +26,10 @@ #include "pgstat.h" #include "utils/guc.h" -#include "replication/walproposer.h" +#include "neon.h" +#include "walproposer.h" +#include "walproposer_utils.h" -PG_MODULE_MAGIC; - -void _PG_init(void); #define PageStoreTrace DEBUG5 @@ -355,7 +354,7 @@ substitute_pageserver_password(const char *page_server_connstring_raw) * Module initialization function */ void -_PG_init(void) +pg_init_libpagestore(void) { DefineCustomStringVariable("neon.pageserver_connstring", "connection string to the page server", diff --git a/src/backend/replication/libpqwalproposer/Makefile b/contrib/neon/libpqwalproposer/Makefile similarity index 91% rename from src/backend/replication/libpqwalproposer/Makefile rename to contrib/neon/libpqwalproposer/Makefile index c570160536f..08793c1eee8 100644 --- a/src/backend/replication/libpqwalproposer/Makefile +++ b/contrib/neon/libpqwalproposer/Makefile @@ -8,8 +8,8 @@ # #------------------------------------------------------------------------- -subdir = src/backend/replication/libpqwalproposer -top_builddir = ../../../.. +subdir = contrib/neon/libpqwalproposer +top_builddir = ../../.. include $(top_builddir)/src/Makefile.global override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) diff --git a/src/backend/replication/libpqwalproposer/libpqwalproposer.c b/contrib/neon/libpqwalproposer/libpqwalproposer.c similarity index 99% rename from src/backend/replication/libpqwalproposer/libpqwalproposer.c rename to contrib/neon/libpqwalproposer/libpqwalproposer.c index a12a2ee04bc..f92498f7bc7 100644 --- a/src/backend/replication/libpqwalproposer/libpqwalproposer.c +++ b/contrib/neon/libpqwalproposer/libpqwalproposer.c @@ -1,6 +1,6 @@ #include "postgres.h" -#include "replication/walproposer.h" +#include "../walproposer.h" #include "libpq-fe.h" /* Required for anything that's dynamically loaded */ diff --git a/contrib/neon/neon.c b/contrib/neon/neon.c index c7c176dba7a..d90244804f4 100644 --- a/contrib/neon/neon.c +++ b/contrib/neon/neon.c @@ -17,11 +17,23 @@ #include "storage/bufmgr.h" #include "catalog/pg_type.h" #include "replication/walsender.h" -#include "replication/walproposer.h" #include "funcapi.h" #include "access/htup_details.h" #include "utils/pg_lsn.h" +#include "neon.h" +#include "walproposer.h" + +PG_MODULE_MAGIC; +void _PG_init(void); + + +void _PG_init(void) +{ + pg_init_libpagestore(); + pg_init_walproposer(); +} + PG_FUNCTION_INFO_V1(pg_cluster_size); PG_FUNCTION_INFO_V1(backpressure_lsns); diff --git a/contrib/neon/neon.h b/contrib/neon/neon.h new file mode 100644 index 00000000000..57be89da883 --- /dev/null +++ b/contrib/neon/neon.h @@ -0,0 +1,11 @@ +// +// Created by Matthias on 2022-08-04. +// + +#ifndef NEON_H +#define NEON_H + +extern void pg_init_libpagestore(void); +extern void pg_init_walproposer(void); + +#endif /* NEON_H */ diff --git a/src/backend/replication/walproposer.c b/contrib/neon/walproposer.c similarity index 95% rename from src/backend/replication/walproposer.c rename to contrib/neon/walproposer.c index 4b0567be707..4ae7ce763f9 100644 --- a/src/backend/replication/walproposer.c +++ b/contrib/neon/walproposer.c @@ -38,7 +38,6 @@ #include #include "access/xlogdefs.h" #include "access/xlogutils.h" -#include "replication/walproposer.h" #include "storage/latch.h" #include "miscadmin.h" #include "pgstat.h" @@ -51,11 +50,21 @@ #include "postmaster/postmaster.h" #include "storage/pmsignal.h" #include "storage/proc.h" +#include "storage/ipc.h" +#include "storage/lwlock.h" +#include "storage/shmem.h" +#include "storage/spin.h" #include "tcop/tcopprot.h" #include "utils/builtins.h" +#include "utils/guc.h" #include "utils/memutils.h" #include "utils/timestamp.h" +#include "neon.h" +#include "walproposer.h" +#include "walproposer_utils.h" +#include "replication/walpropshim.h" + char *wal_acceptors_list; int wal_acceptor_reconnect_timeout; @@ -102,14 +111,11 @@ static int n_votes = 0; static int n_connected = 0; static TimestampTz last_reconnect_attempt; -/* Set to true only in standalone run of `postgres --sync-safekeepers` (see comment on top) */ -static bool syncSafekeepers; - static WalproposerShmemState *walprop_shared; /* Prototypes for private functions */ -static void WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId); -static void WalProposerStart(void); +static void WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId); +static void WalProposerStartImpl(void); static void WalProposerLoop(void); static void InitEventSet(void); static void UpdateEventSet(Safekeeper *sk, uint32 events); @@ -150,6 +156,88 @@ static bool AsyncWrite(Safekeeper *sk, void *msg, size_t msg_size, SafekeeperSta static bool AsyncFlush(Safekeeper *sk); +static void nwp_shmem_startup_hook(void); +static void nwp_register_gucs(void); +static void nwp_prepare_shmem(void); +static uint64 backpressure_lag_impl(void); + + +static shmem_startup_hook_type prev_shmem_startup_hook_type; + + + +void pg_init_walproposer(void) +{ + if (!process_shared_preload_libraries_in_progress) + return; + + nwp_register_gucs(); + + nwp_prepare_shmem(); + + delay_backend_us = &backpressure_lag_impl; + + WalProposerRegister(); + + WalProposerInit = &WalProposerInitImpl; + WalProposerStart = &WalProposerStartImpl; +} + +static void nwp_register_gucs(void) +{ + DefineCustomStringVariable( + "neon.safekeepers", + "List of Neon WAL acceptors (host:port)", + NULL, /* long_desc */ + &wal_acceptors_list, /* valueAddr */ + "", /* bootValue */ + PGC_POSTMASTER, + GUC_LIST_INPUT, /* extensions can't use GUC_LIST_QUOTE */ + NULL, NULL, NULL + ); + + DefineCustomIntVariable( + "neon.safekeeper_reconnect_timeout", + "Timeout for reconnecting to offline wal acceptor.", + NULL, + &wal_acceptor_reconnect_timeout, + 1000, 0, INT_MAX, /* default, min, max */ + PGC_SIGHUP, /* context */ + GUC_UNIT_MS, /* flags */ + NULL, NULL, NULL + ); + + DefineCustomIntVariable( + "neon.safekeeper_connect_timeout", + "Timeout after which give up connection attempt to safekeeper.", + NULL, + &wal_acceptor_connect_timeout, + 5000, 0, INT_MAX, + PGC_SIGHUP, + GUC_UNIT_MS, + NULL, NULL, NULL + ); + +} + +/* shmem handling */ + +static void nwp_prepare_shmem(void) +{ + RequestAddinShmemSpace(WalproposerShmemSize()); + + prev_shmem_startup_hook_type = shmem_startup_hook; + shmem_startup_hook = nwp_shmem_startup_hook; +} + +static void nwp_shmem_startup_hook(void) +{ + if (prev_shmem_startup_hook_type) + prev_shmem_startup_hook_type(); + + WalproposerShmemInit(); +} + /* * WAL proposer bgworker entry point. */ @@ -198,7 +286,6 @@ WalProposerSync(int argc, char *argv[]) { struct stat stat_buf; - syncSafekeepers = true; ThisTimeLineID = 1; InitStandaloneProcess(argv[0]); @@ -362,7 +449,7 @@ WalProposerRegister(void) memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = BGWORKER_SHMEM_ACCESS; bgw.bgw_start_time = BgWorkerStart_RecoveryFinished; - snprintf(bgw.bgw_library_name, BGW_MAXLEN, "postgres"); + snprintf(bgw.bgw_library_name, BGW_MAXLEN, "neon"); snprintf(bgw.bgw_function_name, BGW_MAXLEN, "WalProposerMain"); snprintf(bgw.bgw_name, BGW_MAXLEN, "WAL proposer"); snprintf(bgw.bgw_type, BGW_MAXLEN, "WAL proposer"); @@ -374,7 +461,7 @@ WalProposerRegister(void) } static void -WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) +WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId) { char *host; char *sep; @@ -388,7 +475,6 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) load_file("libpqwalreceiver", false); if (WalReceiverFunctions == NULL) elog(ERROR, "libpqwalreceiver didn't initialize correctly"); - load_file("neon", false); for (host = wal_acceptors_list; host != NULL && *host != '\0'; host = sep) { @@ -454,7 +540,7 @@ WalProposerInit(XLogRecPtr flushRecPtr, uint64 systemId) } static void -WalProposerStart(void) +WalProposerStartImpl(void) { /* Initiate connections to all safekeeper nodes */ @@ -1531,7 +1617,7 @@ WalProposerStartStreaming(XLogRecPtr startpos) cmd.slotname = WAL_PROPOSER_SLOT_NAME; cmd.timeline = greetRequest.timeline; cmd.startpoint = startpos; - StartReplication(&cmd); + StartProposerReplication(&cmd); } /* @@ -2348,3 +2434,47 @@ AsyncFlush(Safekeeper *sk) return false; } } + +// Check if we need to suspend inserts because of lagging replication. +static uint64 +backpressure_lag_impl(void) +{ + if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) + { + XLogRecPtr writePtr; + XLogRecPtr flushPtr; + XLogRecPtr applyPtr; + XLogRecPtr myFlushLsn = GetFlushRecPtr(); + + replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); +#define MB ((XLogRecPtr)1024*1024) + + elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", + LSN_FORMAT_ARGS(myFlushLsn), + LSN_FORMAT_ARGS(writePtr), + LSN_FORMAT_ARGS(flushPtr), + LSN_FORMAT_ARGS(applyPtr)); + + if ((writePtr != UnknownXLogRecPtr + && max_replication_write_lag > 0 + && myFlushLsn > writePtr + max_replication_write_lag*MB)) + { + return (myFlushLsn - writePtr - max_replication_write_lag*MB); + } + + if ((flushPtr != UnknownXLogRecPtr + && max_replication_flush_lag > 0 + && myFlushLsn > flushPtr + max_replication_flush_lag*MB)) + { + return (myFlushLsn - flushPtr - max_replication_flush_lag*MB); + } + + if ((applyPtr != UnknownXLogRecPtr + && max_replication_apply_lag > 0 + && myFlushLsn > applyPtr + max_replication_apply_lag*MB)) + { + return (myFlushLsn - applyPtr - max_replication_apply_lag*MB); + } + } + return 0; +} diff --git a/src/include/replication/walproposer.h b/contrib/neon/walproposer.h similarity index 92% rename from src/include/replication/walproposer.h rename to contrib/neon/walproposer.h index c5a5b76268e..a32b8315759 100644 --- a/src/include/replication/walproposer.h +++ b/contrib/neon/walproposer.h @@ -1,5 +1,5 @@ -#ifndef __WALPROPOSER_H__ -#define __WALPROPOSER_H__ +#ifndef __NEON_WALPROPOSER_H__ +#define __NEON_WALPROPOSER_H__ #include "access/xlogdefs.h" #include "postgres.h" @@ -361,36 +361,13 @@ typedef struct Safekeeper } Safekeeper; -int CompareLsn(const void *a, const void *b); -char* FormatSafekeeperState(SafekeeperState state); -void AssertEventsOkForState(uint32 events, Safekeeper* sk); -uint32 SafekeeperStateDesiredEvents(SafekeeperState state); -char* FormatEvents(uint32 events); -void WalProposerMain(Datum main_arg); +extern PGDLLIMPORT void WalProposerMain(Datum main_arg); void WalProposerBroadcast(XLogRecPtr startpos, XLogRecPtr endpos); -bool HexDecodeString(uint8 *result, char *input, int nbytes); -uint32 pq_getmsgint32_le(StringInfo msg); -uint64 pq_getmsgint64_le(StringInfo msg); -void pq_sendint32_le(StringInfo buf, uint32 i); -void pq_sendint64_le(StringInfo buf, uint64 i); void WalProposerPoll(void); void WalProposerRegister(void); -void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); -void XLogWalPropClose(XLogRecPtr recptr); -void ProcessStandbyReply(XLogRecPtr writePtr, - XLogRecPtr flushPtr, - XLogRecPtr applyPtr, - TimestampTz replyTime, - bool replyRequested); -void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); -void ProcessStandbyHSFeedback(TimestampTz replyTime, - TransactionId feedbackXmin, - uint32 feedbackEpoch, - TransactionId feedbackCatalogXmin, - uint32 feedbackCatalogEpoch); void ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *rf); -void StartReplication(StartReplicationCmd *cmd); +extern void StartProposerReplication(StartReplicationCmd *cmd); void WalProposerSync(int argc, char *argv[]); Size WalproposerShmemSize(void); @@ -562,4 +539,4 @@ typedef struct WalProposerFunctionsType */ extern PGDLLIMPORT WalProposerFunctionsType *WalProposerFunctions; -#endif +#endif /* __NEON_WALPROPOSER_H__ */ diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c new file mode 100644 index 00000000000..05b2b90dee6 --- /dev/null +++ b/contrib/neon/walproposer_utils.c @@ -0,0 +1,1112 @@ +#include "postgres.h" + +#include "access/timeline.h" +#include "access/xlogutils.h" +#include "common/logging.h" +#include "common/ip.h" +#include "funcapi.h" +#include "libpq/libpq.h" +#include "libpq/pqformat.h" +#include "miscadmin.h" +#include "postmaster/interrupt.h" +#include "replication/slot.h" +#include "walproposer_utils.h" +/* WARNING - this may be broken */ +#include "replication/walsender_private.h" + +#include "storage/ipc.h" +#include "utils/builtins.h" +#include "utils/ps_status.h" + +#include "../../src/interfaces/libpq/libpq-fe.h" +#include +#include + +/* + * These variables are used similarly to openLogFile/SegNo, + * but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID + * corresponding the filename of walpropFile. + */ +static int walpropFile = -1; +static TimeLineID walpropFileTLI = 0; +static XLogSegNo walpropSegNo = 0; + +/* START cloned file-local variables and functions from walsender.c */ + +/* + * xlogreader used for replication. Note that a WAL sender doing physical + * replication does not need xlogreader to read WAL, but it needs one to + * keep a state of its work. + */ +static XLogReaderState *xlogreader = NULL; + +/* + * These variables keep track of the state of the timeline we're currently + * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric, + * the timeline is not the latest timeline on this server, and the server's + * history forked off from that timeline at sendTimeLineValidUpto. + */ +static TimeLineID sendTimeLine = 0; +static TimeLineID sendTimeLineNextTLI = 0; +static bool sendTimeLineIsHistoric = false; +static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr; + +/* + * Timestamp of last ProcessRepliesIfAny() that saw a reply from the + * standby. Set to 0 if wal_sender_timeout doesn't need to be active. + */ +static TimestampTz last_reply_timestamp = 0; + +/* Have we sent a heartbeat message asking for reply, since last reply? */ +static bool waiting_for_ping_response = false; + +static bool streamingDoneSending; +static bool streamingDoneReceiving; + +/* Are we there yet? */ +static bool WalSndCaughtUp = false; + +/* Flags set by signal handlers for later service in main loop */ +static volatile sig_atomic_t got_SIGUSR2 = false; +static volatile sig_atomic_t got_STOPPING = false; + +/* + * How far have we sent WAL already? This is also advertised in + * MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.) + */ +static XLogRecPtr sentPtr = InvalidXLogRecPtr; + +/* + * This is set while we are streaming. When not set + * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set, + * the main loop is responsible for checking got_STOPPING and terminating when + * it's set (after streaming any remaining WAL). + */ +static volatile sig_atomic_t replication_active = false; + +typedef void (*WalSndSendDataCallback) (void); +static void WalSndLoop(WalSndSendDataCallback send_data); +static void XLogSendPhysical(void); +static XLogRecPtr GetStandbyFlushRecPtr(void); + +static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p); + +/* END cloned file-level variables and functions from walsender.c */ + +int +CompareLsn(const void *a, const void *b) +{ + XLogRecPtr lsn1 = *((const XLogRecPtr *) a); + XLogRecPtr lsn2 = *((const XLogRecPtr *) b); + + if (lsn1 < lsn2) + return -1; + else if (lsn1 == lsn2) + return 0; + else + return 1; +} + +/* Returns a human-readable string corresonding to the SafekeeperState + * + * The string should not be freed. + * + * The strings are intended to be used as a prefix to "state", e.g.: + * + * elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); + * + * If this sort of phrasing doesn't fit the message, instead use something like: + * + * elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); + */ +char* +FormatSafekeeperState(SafekeeperState state) +{ + char* return_val = NULL; + + switch (state) + { + case SS_OFFLINE: + return_val = "offline"; + break; + case SS_CONNECTING_READ: + case SS_CONNECTING_WRITE: + return_val = "connecting"; + break; + case SS_WAIT_EXEC_RESULT: + return_val = "receiving query result"; + break; + case SS_HANDSHAKE_RECV: + return_val = "handshake (receiving)"; + break; + case SS_VOTING: + return_val = "voting"; + break; + case SS_WAIT_VERDICT: + return_val = "wait-for-verdict"; + break; + case SS_SEND_ELECTED_FLUSH: + return_val = "send-announcement-flush"; + break; + case SS_IDLE: + return_val = "idle"; + break; + case SS_ACTIVE: + return_val = "active"; + break; + } + + Assert(return_val != NULL); + + return return_val; +} + +/* Asserts that the provided events are expected for given safekeeper's state */ +void +AssertEventsOkForState(uint32 events, Safekeeper* sk) +{ + uint32 expected = SafekeeperStateDesiredEvents(sk->state); + + /* The events are in-line with what we're expecting, under two conditions: + * (a) if we aren't expecting anything, `events` has no read- or + * write-ready component. + * (b) if we are expecting something, there's overlap + * (i.e. `events & expected != 0`) + */ + bool events_ok_for_state; /* long name so the `Assert` is more clear later */ + + if (expected == WL_NO_EVENTS) + events_ok_for_state = ((events & (WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE)) == 0); + else + events_ok_for_state = ((events & expected) != 0); + + if (!events_ok_for_state) + { + /* To give a descriptive message in the case of failure, we use elog and + * then an assertion that's guaranteed to fail. */ + elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", + FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state)); + Assert(events_ok_for_state); + } +} + +/* Returns the set of events a safekeeper in this state should be waiting on + * + * This will return WL_NO_EVENTS (= 0) for some events. */ +uint32 +SafekeeperStateDesiredEvents(SafekeeperState state) +{ + uint32 result = WL_NO_EVENTS; + + /* If the state doesn't have a modifier, we can check the base state */ + switch (state) + { + /* Connecting states say what they want in the name */ + case SS_CONNECTING_READ: + result = WL_SOCKET_READABLE; + break; + case SS_CONNECTING_WRITE: + result = WL_SOCKET_WRITEABLE; + break; + + /* Reading states need the socket to be read-ready to continue */ + case SS_WAIT_EXEC_RESULT: + case SS_HANDSHAKE_RECV: + case SS_WAIT_VERDICT: + result = WL_SOCKET_READABLE; + break; + + /* Idle states use read-readiness as a sign that the connection has been + * disconnected. */ + case SS_VOTING: + case SS_IDLE: + result = WL_SOCKET_READABLE; + break; + + /* + * Flush states require write-ready for flushing. + * Active state does both reading and writing. + * + * TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We should + * check sk->flushWrite here to set WL_SOCKET_WRITEABLE. + */ + case SS_SEND_ELECTED_FLUSH: + case SS_ACTIVE: + result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; + break; + + /* The offline state expects no events. */ + case SS_OFFLINE: + result = WL_NO_EVENTS; + break; + + default: + Assert(false); + break; + } + + return result; +} + +/* Returns a human-readable string corresponding to the event set + * + * If the events do not correspond to something set as the `events` field of a `WaitEvent`, the + * returned string may be meaingless. + * + * The string should not be freed. It should also not be expected to remain the same between + * function calls. */ +char* +FormatEvents(uint32 events) +{ + static char return_str[8]; + + /* Helper variable to check if there's extra bits */ + uint32 all_flags = WL_LATCH_SET + | WL_SOCKET_READABLE + | WL_SOCKET_WRITEABLE + | WL_TIMEOUT + | WL_POSTMASTER_DEATH + | WL_EXIT_ON_PM_DEATH + | WL_SOCKET_CONNECTED; + + /* The formatting here isn't supposed to be *particularly* useful -- it's just to give an + * sense of what events have been triggered without needing to remember your powers of two. */ + + return_str[0] = (events & WL_LATCH_SET ) ? 'L' : '_'; + return_str[1] = (events & WL_SOCKET_READABLE ) ? 'R' : '_'; + return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_'; + return_str[3] = (events & WL_TIMEOUT ) ? 'T' : '_'; + return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_'; + return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_'; + return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_'; + + if (events & (~all_flags)) + { + elog(WARNING, "Event formatting found unexpected component %d", + events & (~all_flags)); + return_str[6] = '*'; + return_str[7] = '\0'; + } + else + return_str[6] = '\0'; + + return (char *) &return_str; +} + +/* + * Convert a character which represents a hexadecimal digit to an integer. + * + * Returns -1 if the character is not a hexadecimal digit. + */ +static int +HexDecodeChar(char c) +{ + if (c >= '0' && c <= '9') + return c - '0'; + if (c >= 'a' && c <= 'f') + return c - 'a' + 10; + if (c >= 'A' && c <= 'F') + return c - 'A' + 10; + + return -1; +} + +/* + * Decode a hex string into a byte string, 2 hex chars per byte. + * + * Returns false if invalid characters are encountered; otherwise true. + */ +bool +HexDecodeString(uint8 *result, char *input, int nbytes) +{ + int i; + + for (i = 0; i < nbytes; ++i) + { + int n1 = HexDecodeChar(input[i * 2]); + int n2 = HexDecodeChar(input[i * 2 + 1]); + + if (n1 < 0 || n2 < 0) + return false; + result[i] = n1 * 16 + n2; + } + + return true; +} + +/* -------------------------------- + * pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order + * -------------------------------- + */ +uint32 +pq_getmsgint32_le(StringInfo msg) +{ + uint32 n32; + + pq_copymsgbytes(msg, (char *) &n32, sizeof(n32)); + + return n32; +} + +/* -------------------------------- + * pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order + * -------------------------------- + */ +uint64 +pq_getmsgint64_le(StringInfo msg) +{ + uint64 n64; + + pq_copymsgbytes(msg, (char *) &n64, sizeof(n64)); + + return n64; +} + +/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */ +void +pq_sendint32_le(StringInfo buf, uint32 i) +{ + enlargeStringInfo(buf, sizeof(uint32)); + memcpy(buf->data + buf->len, &i, sizeof(uint32)); + buf->len += sizeof(uint32); +} + +/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */ +void +pq_sendint64_le(StringInfo buf, uint64 i) +{ + enlargeStringInfo(buf, sizeof(uint64)); + memcpy(buf->data + buf->len, &i, sizeof(uint64)); + buf->len += sizeof(uint64); +} + +/* + * Write XLOG data to disk. + */ +void +XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr) +{ + int startoff; + int byteswritten; + + while (nbytes > 0) + { + int segbytes; + + /* Close the current segment if it's completed */ + if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) + XLogWalPropClose(recptr); + + if (walpropFile < 0) + { + bool use_existent = true; + + /* Create/use new log file */ + XLByteToSeg(recptr, walpropSegNo, wal_segment_size); + walpropFile = XLogFileInit(walpropSegNo, &use_existent, false); + walpropFileTLI = ThisTimeLineID; + } + + /* Calculate the start offset of the received logs */ + startoff = XLogSegmentOffset(recptr, wal_segment_size); + + if (startoff + nbytes > wal_segment_size) + segbytes = wal_segment_size - startoff; + else + segbytes = nbytes; + + /* OK to write the logs */ + errno = 0; + + byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff); + if (byteswritten <= 0) + { + char xlogfname[MAXFNAMELEN]; + int save_errno; + + /* if write didn't set errno, assume no disk space */ + if (errno == 0) + errno = ENOSPC; + + save_errno = errno; + XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); + errno = save_errno; + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not write to log segment %s " + "at offset %u, length %lu: %m", + xlogfname, startoff, (unsigned long) segbytes))); + } + + /* Update state for write */ + recptr += byteswritten; + + nbytes -= byteswritten; + buf += byteswritten; + } + + /* + * Close the current segment if it's fully written up in the last cycle of + * the loop. + */ + if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) + { + XLogWalPropClose(recptr); + } +} + +/* + * Close the current segment. + */ +void +XLogWalPropClose(XLogRecPtr recptr) +{ + Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)); + + if (close(walpropFile) != 0) + { + char xlogfname[MAXFNAMELEN]; + XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); + + ereport(PANIC, + (errcode_for_file_access(), + errmsg("could not close log segment %s: %m", + xlogfname))); + } + + walpropFile = -1; +} + +/* START of cloned functions from walsender.c */ + +/* + * Handle START_REPLICATION command. + * + * At the moment, this never returns, but an ereport(ERROR) will take us back + * to the main loop. + */ +void +StartProposerReplication(StartReplicationCmd *cmd) +{ + StringInfoData buf; + XLogRecPtr FlushPtr; + + if (ThisTimeLineID == 0) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION"))); + + /* create xlogreader for physical replication */ + xlogreader = + XLogReaderAllocate(wal_segment_size, NULL, + XL_ROUTINE(.segment_open = WalSndSegmentOpen, + .segment_close = wal_segment_close), + NULL); + + if (!xlogreader) + ereport(ERROR, + (errcode(ERRCODE_OUT_OF_MEMORY), + errmsg("out of memory"))); + + /* + * We assume here that we're logging enough information in the WAL for + * log-shipping, since this is checked in PostmasterMain(). + * + * NOTE: wal_level can only change at shutdown, so in most cases it is + * difficult for there to be WAL data that we can still see that was + * written at wal_level='minimal'. + */ + + if (cmd->slotname) + { + ReplicationSlotAcquire(cmd->slotname, true); + if (SlotIsLogical(MyReplicationSlot)) + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("cannot use a logical replication slot for physical replication"))); + + /* + * We don't need to verify the slot's restart_lsn here; instead we + * rely on the caller requesting the starting point to use. If the + * WAL segment doesn't exist, we'll fail later. + */ + } + + /* + * Select the timeline. If it was given explicitly by the client, use + * that. Otherwise use the timeline of the last replayed record, which is + * kept in ThisTimeLineID. + */ + if (am_cascading_walsender) + { + /* this also updates ThisTimeLineID */ + FlushPtr = GetStandbyFlushRecPtr(); + } + else + FlushPtr = GetFlushRecPtr(); + + if (cmd->timeline != 0) + { + XLogRecPtr switchpoint; + + sendTimeLine = cmd->timeline; + if (sendTimeLine == ThisTimeLineID) + { + sendTimeLineIsHistoric = false; + sendTimeLineValidUpto = InvalidXLogRecPtr; + } + else + { + List *timeLineHistory; + + sendTimeLineIsHistoric = true; + + /* + * Check that the timeline the client requested exists, and the + * requested start location is on that timeline. + */ + timeLineHistory = readTimeLineHistory(ThisTimeLineID); + switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory, + &sendTimeLineNextTLI); + list_free_deep(timeLineHistory); + + /* + * Found the requested timeline in the history. Check that + * requested startpoint is on that timeline in our history. + * + * This is quite loose on purpose. We only check that we didn't + * fork off the requested timeline before the switchpoint. We + * don't check that we switched *to* it before the requested + * starting point. This is because the client can legitimately + * request to start replication from the beginning of the WAL + * segment that contains switchpoint, but on the new timeline, so + * that it doesn't end up with a partial segment. If you ask for + * too old a starting point, you'll get an error later when we + * fail to find the requested WAL segment in pg_wal. + * + * XXX: we could be more strict here and only allow a startpoint + * that's older than the switchpoint, if it's still in the same + * WAL segment. + */ + if (!XLogRecPtrIsInvalid(switchpoint) && + switchpoint < cmd->startpoint) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X on timeline %u is not in this server's history", + LSN_FORMAT_ARGS(cmd->startpoint), + cmd->timeline), + errdetail("This server's history forked from timeline %u at %X/%X.", + cmd->timeline, + LSN_FORMAT_ARGS(switchpoint)))); + } + sendTimeLineValidUpto = switchpoint; + } + } + else + { + sendTimeLine = ThisTimeLineID; + sendTimeLineValidUpto = InvalidXLogRecPtr; + sendTimeLineIsHistoric = false; + } + + streamingDoneSending = streamingDoneReceiving = false; + + /* If there is nothing to stream, don't even enter COPY mode */ + if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto) + { + /* + * When we first start replication the standby will be behind the + * primary. For some applications, for example synchronous + * replication, it is important to have a clear state for this initial + * catchup mode, so we can trigger actions when we change streaming + * state later. We may stay in this state for a long time, which is + * exactly why we want to be able to monitor whether or not we are + * still here. + */ + WalSndSetState(WALSNDSTATE_CATCHUP); + + /* + * Don't allow a request to stream from a future point in WAL that + * hasn't been flushed to disk in this server yet. + */ + if (FlushPtr < cmd->startpoint) + { + ereport(ERROR, + (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X", + LSN_FORMAT_ARGS(cmd->startpoint), + LSN_FORMAT_ARGS(FlushPtr)))); + } + + /* Start streaming from the requested point */ + sentPtr = cmd->startpoint; + + /* Initialize shared memory status, too */ + SpinLockAcquire(&MyWalSnd->mutex); + MyWalSnd->sentPtr = sentPtr; + SpinLockRelease(&MyWalSnd->mutex); + + SyncRepInitConfig(); + + /* Main loop of walsender */ + replication_active = true; + + WalSndLoop(XLogSendPhysical); + + replication_active = false; + if (got_STOPPING) + proc_exit(0); + WalSndSetState(WALSNDSTATE_STARTUP); + + Assert(streamingDoneSending && streamingDoneReceiving); + } + + if (cmd->slotname) + ReplicationSlotRelease(); + + /* + * Copy is finished now. Send a single-row result set indicating the next + * timeline. + */ + if (sendTimeLineIsHistoric) + { + char startpos_str[8 + 1 + 8 + 1]; + DestReceiver *dest; + TupOutputState *tstate; + TupleDesc tupdesc; + Datum values[2]; + bool nulls[2]; + + snprintf(startpos_str, sizeof(startpos_str), "%X/%X", + LSN_FORMAT_ARGS(sendTimeLineValidUpto)); + + dest = CreateDestReceiver(DestRemoteSimple); + MemSet(nulls, false, sizeof(nulls)); + + /* + * Need a tuple descriptor representing two columns. int8 may seem + * like a surprising data type for this, but in theory int4 would not + * be wide enough for this, as TimeLineID is unsigned. + */ + tupdesc = CreateTemplateTupleDesc(2); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli", + INT8OID, -1, 0); + TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos", + TEXTOID, -1, 0); + + /* prepare for projection of tuple */ + tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual); + + values[0] = Int64GetDatum((int64) sendTimeLineNextTLI); + values[1] = CStringGetTextDatum(startpos_str); + + /* send it to dest */ + do_tup_output(tstate, values, nulls); + + end_tup_output(tstate); + } + + /* Send CommandComplete message */ + EndReplicationCommand("START_STREAMING"); +} + +/* + * Returns the latest point in WAL that has been safely flushed to disk, and + * can be sent to the standby. This should only be called when in recovery, + * ie. we're streaming to a cascaded standby. + * + * As a side-effect, ThisTimeLineID is updated to the TLI of the last + * replayed WAL record. + */ +static XLogRecPtr +GetStandbyFlushRecPtr(void) +{ + XLogRecPtr replayPtr; + TimeLineID replayTLI; + XLogRecPtr receivePtr; + TimeLineID receiveTLI; + XLogRecPtr result; + + /* + * We can safely send what's already been replayed. Also, if walreceiver + * is streaming WAL from the same timeline, we can send anything that it + * has streamed, but hasn't been replayed yet. + */ + + receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI); + replayPtr = GetXLogReplayRecPtr(&replayTLI); + + ThisTimeLineID = replayTLI; + + result = replayPtr; + if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr) + result = receivePtr; + + return result; +} + +/* XLogReaderRoutine->segment_open callback */ +static void +WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, + TimeLineID *tli_p) +{ + char path[MAXPGPATH]; + + /*------- + * When reading from a historic timeline, and there is a timeline switch + * within this segment, read from the WAL segment belonging to the new + * timeline. + * + * For example, imagine that this server is currently on timeline 5, and + * we're streaming timeline 4. The switch from timeline 4 to 5 happened at + * 0/13002088. In pg_wal, we have these files: + * + * ... + * 000000040000000000000012 + * 000000040000000000000013 + * 000000050000000000000013 + * 000000050000000000000014 + * ... + * + * In this situation, when requested to send the WAL from segment 0x13, on + * timeline 4, we read the WAL from file 000000050000000000000013. Archive + * recovery prefers files from newer timelines, so if the segment was + * restored from the archive on this server, the file belonging to the old + * timeline, 000000040000000000000013, might not exist. Their contents are + * equal up to the switchpoint, because at a timeline switch, the used + * portion of the old segment is copied to the new file. ------- + */ + *tli_p = sendTimeLine; + if (sendTimeLineIsHistoric) + { + XLogSegNo endSegNo; + + XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize); + if (nextSegNo == endSegNo) + *tli_p = sendTimeLineNextTLI; + } + + XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize); + state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY); + if (state->seg.ws_file >= 0) + return; + + /* + * If the file is not found, assume it's because the standby asked for a + * too old WAL segment that has already been removed or recycled. + */ + if (errno == ENOENT) + { + char xlogfname[MAXFNAMELEN]; + int save_errno = errno; + + XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size); + errno = save_errno; + ereport(ERROR, + (errcode_for_file_access(), + errmsg("requested WAL segment %s has already been removed", + xlogfname))); + } + else + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open file \"%s\": %m", + path))); +} + + +/* Main loop of walsender process that streams the WAL over Copy messages. */ +static void +WalSndLoop(WalSndSendDataCallback send_data) +{ + /* + * Initialize the last reply timestamp. That enables timeout processing + * from hereon. + */ + last_reply_timestamp = GetCurrentTimestamp(); + waiting_for_ping_response = false; + + /* + * Loop until we reach the end of this timeline or the client requests to + * stop streaming. + */ + for (;;) + { + /* Clear any already-pending wakeups */ + ResetLatch(MyLatch); + + CHECK_FOR_INTERRUPTS(); + + /* Process any requests or signals received recently */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + SyncRepInitConfig(); + } + + /* always true */ + if (am_wal_proposer) + { + send_data(); + if (WalSndCaughtUp) + { + if (MyWalSnd->state == WALSNDSTATE_CATCHUP) + WalSndSetState(WALSNDSTATE_STREAMING); + WalProposerPoll(); + WalSndCaughtUp = false; + } + continue; + } + } +} + +/* + * Send out the WAL in its normal physical/stored form. + * + * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk, + * but not yet sent to the client, and buffer it in the libpq output + * buffer. + * + * If there is no unsent WAL remaining, WalSndCaughtUp is set to true, + * otherwise WalSndCaughtUp is set to false. + */ +static void +XLogSendPhysical(void) +{ + XLogRecPtr SendRqstPtr; + XLogRecPtr startptr; + XLogRecPtr endptr; + Size nbytes; + XLogSegNo segno; + WALReadError errinfo; + + /* If requested switch the WAL sender to the stopping state. */ + if (got_STOPPING) + WalSndSetState(WALSNDSTATE_STOPPING); + + if (streamingDoneSending) + { + WalSndCaughtUp = true; + return; + } + + /* Figure out how far we can safely send the WAL. */ + if (sendTimeLineIsHistoric) + { + /* + * Streaming an old timeline that's in this server's history, but is + * not the one we're currently inserting or replaying. It can be + * streamed up to the point where we switched off that timeline. + */ + SendRqstPtr = sendTimeLineValidUpto; + } + else if (am_cascading_walsender) + { + /* + * Streaming the latest timeline on a standby. + * + * Attempt to send all WAL that has already been replayed, so that we + * know it's valid. If we're receiving WAL through streaming + * replication, it's also OK to send any WAL that has been received + * but not replayed. + * + * The timeline we're recovering from can change, or we can be + * promoted. In either case, the current timeline becomes historic. We + * need to detect that so that we don't try to stream past the point + * where we switched to another timeline. We check for promotion or + * timeline switch after calculating FlushPtr, to avoid a race + * condition: if the timeline becomes historic just after we checked + * that it was still current, it's still be OK to stream it up to the + * FlushPtr that was calculated before it became historic. + */ + bool becameHistoric = false; + + SendRqstPtr = GetStandbyFlushRecPtr(); + + if (!RecoveryInProgress()) + { + /* + * We have been promoted. RecoveryInProgress() updated + * ThisTimeLineID to the new current timeline. + */ + am_cascading_walsender = false; + becameHistoric = true; + } + else + { + /* + * Still a cascading standby. But is the timeline we're sending + * still the one recovery is recovering from? ThisTimeLineID was + * updated by the GetStandbyFlushRecPtr() call above. + */ + if (sendTimeLine != ThisTimeLineID) + becameHistoric = true; + } + + if (becameHistoric) + { + /* + * The timeline we were sending has become historic. Read the + * timeline history file of the new timeline to see where exactly + * we forked off from the timeline we were sending. + */ + List *history; + + history = readTimeLineHistory(ThisTimeLineID); + sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI); + + Assert(sendTimeLine < sendTimeLineNextTLI); + list_free_deep(history); + + sendTimeLineIsHistoric = true; + + SendRqstPtr = sendTimeLineValidUpto; + } + } + else + { + /* + * Streaming the current timeline on a primary. + * + * Attempt to send all data that's already been written out and + * fsync'd to disk. We cannot go further than what's been written out + * given the current implementation of WALRead(). And in any case + * it's unsafe to send WAL that is not securely down to disk on the + * primary: if the primary subsequently crashes and restarts, standbys + * must not have applied any WAL that got lost on the primary. + */ + SendRqstPtr = GetFlushRecPtr(); + } + + /* + * Record the current system time as an approximation of the time at which + * this WAL location was written for the purposes of lag tracking. + * + * In theory we could make XLogFlush() record a time in shmem whenever WAL + * is flushed and we could get that time as well as the LSN when we call + * GetFlushRecPtr() above (and likewise for the cascading standby + * equivalent), but rather than putting any new code into the hot WAL path + * it seems good enough to capture the time here. We should reach this + * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that + * may take some time, we read the WAL flush pointer and take the time + * very close to together here so that we'll get a later position if it is + * still moving. + * + * Because LagTrackerWrite ignores samples when the LSN hasn't advanced, + * this gives us a cheap approximation for the WAL flush time for this + * LSN. + * + * Note that the LSN is not necessarily the LSN for the data contained in + * the present message; it's the end of the WAL, which might be further + * ahead. All the lag tracking machinery cares about is finding out when + * that arbitrary LSN is eventually reported as written, flushed and + * applied, so that it can measure the elapsed time. + */ + LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp()); + + /* + * If this is a historic timeline and we've reached the point where we + * forked to the next timeline, stop streaming. + * + * Note: We might already have sent WAL > sendTimeLineValidUpto. The + * startup process will normally replay all WAL that has been received + * from the primary, before promoting, but if the WAL streaming is + * terminated at a WAL page boundary, the valid portion of the timeline + * might end in the middle of a WAL record. We might've already sent the + * first half of that partial WAL record to the cascading standby, so that + * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't + * replay the partial WAL record either, so it can still follow our + * timeline switch. + */ + if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr) + { + /* close the current file. */ + if (xlogreader->seg.ws_file >= 0) + wal_segment_close(xlogreader); + + /* Send CopyDone */ + pq_putmessage_noblock('c', NULL, 0); + streamingDoneSending = true; + + WalSndCaughtUp = true; + + elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)", + LSN_FORMAT_ARGS(sendTimeLineValidUpto), + LSN_FORMAT_ARGS(sentPtr)); + return; + } + + /* Do we have any work to do? */ + Assert(sentPtr <= SendRqstPtr); + if (SendRqstPtr <= sentPtr) + { + WalSndCaughtUp = true; + return; + } + + /* + * Figure out how much to send in one message. If there's no more than + * MAX_SEND_SIZE bytes to send, send everything. Otherwise send + * MAX_SEND_SIZE bytes, but round back to logfile or page boundary. + * + * The rounding is not only for performance reasons. Walreceiver relies on + * the fact that we never split a WAL record across two messages. Since a + * long WAL record is split at page boundary into continuation records, + * page boundary is always a safe cut-off point. We also assume that + * SendRqstPtr never points to the middle of a WAL record. + */ + startptr = sentPtr; + endptr = startptr; + endptr += MAX_SEND_SIZE; + + /* if we went beyond SendRqstPtr, back off */ + if (SendRqstPtr <= endptr) + { + endptr = SendRqstPtr; + if (sendTimeLineIsHistoric) + WalSndCaughtUp = false; + else + WalSndCaughtUp = true; + } + else + { + /* round down to page boundary. */ + endptr -= (endptr % XLOG_BLCKSZ); + WalSndCaughtUp = false; + } + + nbytes = endptr - startptr; + Assert(nbytes <= MAX_SEND_SIZE); + + /* always true */ + if (am_wal_proposer) + { + WalProposerBroadcast(startptr, endptr); + } + else + { + /* code removed for brevity */ + } + sentPtr = endptr; + + /* Update shared memory status */ + { + WalSnd *walsnd = MyWalSnd; + + SpinLockAcquire(&walsnd->mutex); + walsnd->sentPtr = sentPtr; + SpinLockRelease(&walsnd->mutex); + } + + /* Report progress of XLOG streaming in PS display */ + if (update_process_title) + { + char activitymsg[50]; + + snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X", + LSN_FORMAT_ARGS(sentPtr)); + set_ps_display(activitymsg); + } +} + diff --git a/contrib/neon/walproposer_utils.h b/contrib/neon/walproposer_utils.h new file mode 100644 index 00000000000..4771d3ff829 --- /dev/null +++ b/contrib/neon/walproposer_utils.h @@ -0,0 +1,19 @@ +#ifndef __NEON_WALPROPOSER_UTILS_H__ +#define __NEON_WALPROPOSER_UTILS_H__ + +#include "walproposer.h" + +int CompareLsn(const void *a, const void *b); +char* FormatSafekeeperState(SafekeeperState state); +void AssertEventsOkForState(uint32 events, Safekeeper* sk); +uint32 SafekeeperStateDesiredEvents(SafekeeperState state); +char* FormatEvents(uint32 events); +bool HexDecodeString(uint8 *result, char *input, int nbytes); +uint32 pq_getmsgint32_le(StringInfo msg); +uint64 pq_getmsgint64_le(StringInfo msg); +void pq_sendint32_le(StringInfo buf, uint32 i); +void pq_sendint64_le(StringInfo buf, uint64 i); +void XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr); +void XLogWalPropClose(XLogRecPtr recptr); + +#endif /* __NEON_WALPROPOSER_UTILS_H__ */ diff --git a/src/Makefile b/src/Makefile index 2f32e3d5137..79e274a4769 100644 --- a/src/Makefile +++ b/src/Makefile @@ -22,7 +22,6 @@ SUBDIRS = \ include \ interfaces \ backend/replication/libpqwalreceiver \ - backend/replication/libpqwalproposer \ backend/replication/pgoutput \ fe_utils \ bin \ diff --git a/src/backend/access/transam/xloginsert.c b/src/backend/access/transam/xloginsert.c index 2d3549d1e7f..48dc77b9e47 100644 --- a/src/backend/access/transam/xloginsert.c +++ b/src/backend/access/transam/xloginsert.c @@ -461,7 +461,7 @@ XLogInsert(RmgrId rmid, uint8 info) return EndPos; } - if (backpressure_lag() > 0) + if (delay_backend_us != NULL && delay_backend_us() > 0) { InterruptPending = true; } diff --git a/src/backend/main/main.c b/src/backend/main/main.c index eb98fca066f..fb2c29c06bb 100644 --- a/src/backend/main/main.c +++ b/src/backend/main/main.c @@ -35,7 +35,7 @@ #include "common/username.h" #include "port/atomics.h" #include "postmaster/postmaster.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "storage/spin.h" #include "tcop/tcopprot.h" #include "utils/help_config.h" diff --git a/src/backend/postmaster/bgworker.c b/src/backend/postmaster/bgworker.c index 2be49df0eb0..540a8454da2 100644 --- a/src/backend/postmaster/bgworker.c +++ b/src/backend/postmaster/bgworker.c @@ -22,7 +22,7 @@ #include "postmaster/postmaster.h" #include "replication/logicallauncher.h" #include "replication/logicalworker.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "storage/dsm.h" #include "storage/ipc.h" #include "storage/latch.h" @@ -129,9 +129,6 @@ static const struct }, { "ApplyWorkerMain", ApplyWorkerMain - }, - { - "WalProposerMain", WalProposerMain } }; diff --git a/src/backend/postmaster/postmaster.c b/src/backend/postmaster/postmaster.c index 3fd3d2b3bc4..caac7362872 100644 --- a/src/backend/postmaster/postmaster.c +++ b/src/backend/postmaster/postmaster.c @@ -117,7 +117,7 @@ #include "postmaster/syslogger.h" #include "replication/logicallauncher.h" #include "replication/walsender.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "storage/fd.h" #include "storage/ipc.h" #include "storage/pg_shmem.h" @@ -998,11 +998,6 @@ PostmasterMain(int argc, char *argv[]) */ ApplyLauncherRegister(); - /* - * Start WAL proposer bgworker is wal acceptors list is not empty - */ - WalProposerRegister(); - /* * process any libraries that should be preloaded at postmaster start */ diff --git a/src/backend/replication/Makefile b/src/backend/replication/Makefile index 23731a07576..4b7c3d32a4d 100644 --- a/src/backend/replication/Makefile +++ b/src/backend/replication/Makefile @@ -25,8 +25,7 @@ OBJS = \ walreceiver.o \ walreceiverfuncs.o \ walsender.o \ - walproposer.o \ - walproposer_utils.o + walpropcompat.o SUBDIRS = logical diff --git a/src/backend/replication/walpropcompat.c b/src/backend/replication/walpropcompat.c new file mode 100644 index 00000000000..8caf2460795 --- /dev/null +++ b/src/backend/replication/walpropcompat.c @@ -0,0 +1,96 @@ +#include "postgres.h" + +#include +#include +#include + +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "access/xlogdefs.h" +#include "miscadmin.h" +#include "postmaster/bgworker.h" +#include "postmaster/postmaster.h" +#include "storage/fd.h" +#include "utils/guc.h" +#include "replication/walpropshim.h" + +bool syncSafekeepers = false; +void (*WalProposerInit) (XLogRecPtr flushRecPtr, uint64 systemId) = NULL; +void (*WalProposerStart) (void) = NULL; + +/* + * Entry point for `postgres --sync-safekeepers`. + */ +void +WalProposerSync(int argc, char *argv[]) +{ + struct stat stat_buf; + + syncSafekeepers = true; + ThisTimeLineID = 1; + + InitStandaloneProcess(argv[0]); + + SetProcessingMode(InitProcessing); + + /* + * Set default values for command-line options. + */ + InitializeGUCOptions(); + + /* Acquire configuration parameters */ + if (!SelectConfigFiles(NULL, progname)) + exit(1); + + /* + * Imitate we are early in bootstrap loading shared_preload_libraries; + * zenith extension sets PGC_POSTMASTER gucs requiring this. + */ + process_shared_preload_libraries_in_progress = true; + + /* + * Initialize postmaster_alive_fds as WaitEventSet checks them. + * + * Copied from InitPostmasterDeathWatchHandle() + */ + if (pipe(postmaster_alive_fds) < 0) + ereport(FATAL, + (errcode_for_file_access(), + errmsg_internal("could not create pipe to monitor postmaster death: %m"))); + if (fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) == -1) + ereport(FATAL, + (errcode_for_socket_access(), + errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m"))); + + ChangeToDataDir(); + + /* Create pg_wal directory, if it doesn't exist */ + if (stat(XLOGDIR, &stat_buf) != 0) + { + ereport(LOG, (errmsg("creating missing WAL directory \"%s\"", XLOGDIR))); + if (MakePGDirectory(XLOGDIR) < 0) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not create directory \"%s\": %m", + XLOGDIR))); + exit(1); + } + } + + load_file("neon", false); + + if (NULL == WalProposerInit) + elog(ERROR, "Neon failed to register WalProposerInit"); + + if (NULL == WalProposerStart) + elog(ERROR, "Neon failed to register WalProposerStart"); + + WalProposerInit(0, 0); + + process_shared_preload_libraries_in_progress = false; + + BackgroundWorkerUnblockSignals(); + + WalProposerStart(); +} diff --git a/src/backend/replication/walproposer_utils.c b/src/backend/replication/walproposer_utils.c deleted file mode 100644 index c9ddafdee0c..00000000000 --- a/src/backend/replication/walproposer_utils.c +++ /dev/null @@ -1,402 +0,0 @@ -#include "postgres.h" - -#include "replication/walproposer.h" -#include "libpq/pqformat.h" -#include "common/logging.h" -#include "common/ip.h" -#include "../interfaces/libpq/libpq-fe.h" -#include -#include - -/* - * These variables are used similarly to openLogFile/SegNo, - * but for walproposer to write the XLOG during recovery. walpropFileTLI is the TimeLineID - * corresponding the filename of walpropFile. - */ -static int walpropFile = -1; -static TimeLineID walpropFileTLI = 0; -static XLogSegNo walpropSegNo = 0; - -int -CompareLsn(const void *a, const void *b) -{ - XLogRecPtr lsn1 = *((const XLogRecPtr *) a); - XLogRecPtr lsn2 = *((const XLogRecPtr *) b); - - if (lsn1 < lsn2) - return -1; - else if (lsn1 == lsn2) - return 0; - else - return 1; -} - -/* Returns a human-readable string corresonding to the SafekeeperState - * - * The string should not be freed. - * - * The strings are intended to be used as a prefix to "state", e.g.: - * - * elog(LOG, "currently in %s state", FormatSafekeeperState(sk->state)); - * - * If this sort of phrasing doesn't fit the message, instead use something like: - * - * elog(LOG, "currently in state [%s]", FormatSafekeeperState(sk->state)); - */ -char* -FormatSafekeeperState(SafekeeperState state) -{ - char* return_val = NULL; - - switch (state) - { - case SS_OFFLINE: - return_val = "offline"; - break; - case SS_CONNECTING_READ: - case SS_CONNECTING_WRITE: - return_val = "connecting"; - break; - case SS_WAIT_EXEC_RESULT: - return_val = "receiving query result"; - break; - case SS_HANDSHAKE_RECV: - return_val = "handshake (receiving)"; - break; - case SS_VOTING: - return_val = "voting"; - break; - case SS_WAIT_VERDICT: - return_val = "wait-for-verdict"; - break; - case SS_SEND_ELECTED_FLUSH: - return_val = "send-announcement-flush"; - break; - case SS_IDLE: - return_val = "idle"; - break; - case SS_ACTIVE: - return_val = "active"; - break; - } - - Assert(return_val != NULL); - - return return_val; -} - -/* Asserts that the provided events are expected for given safekeeper's state */ -void -AssertEventsOkForState(uint32 events, Safekeeper* sk) -{ - uint32 expected = SafekeeperStateDesiredEvents(sk->state); - - /* The events are in-line with what we're expecting, under two conditions: - * (a) if we aren't expecting anything, `events` has no read- or - * write-ready component. - * (b) if we are expecting something, there's overlap - * (i.e. `events & expected != 0`) - */ - bool events_ok_for_state; /* long name so the `Assert` is more clear later */ - - if (expected == WL_NO_EVENTS) - events_ok_for_state = ((events & (WL_SOCKET_READABLE|WL_SOCKET_WRITEABLE)) == 0); - else - events_ok_for_state = ((events & expected) != 0); - - if (!events_ok_for_state) - { - /* To give a descriptive message in the case of failure, we use elog and - * then an assertion that's guaranteed to fail. */ - elog(WARNING, "events %s mismatched for safekeeper %s:%s in state [%s]", - FormatEvents(events), sk->host, sk->port, FormatSafekeeperState(sk->state)); - Assert(events_ok_for_state); - } -} - -/* Returns the set of events a safekeeper in this state should be waiting on - * - * This will return WL_NO_EVENTS (= 0) for some events. */ -uint32 -SafekeeperStateDesiredEvents(SafekeeperState state) -{ - uint32 result = WL_NO_EVENTS; - - /* If the state doesn't have a modifier, we can check the base state */ - switch (state) - { - /* Connecting states say what they want in the name */ - case SS_CONNECTING_READ: - result = WL_SOCKET_READABLE; - break; - case SS_CONNECTING_WRITE: - result = WL_SOCKET_WRITEABLE; - break; - - /* Reading states need the socket to be read-ready to continue */ - case SS_WAIT_EXEC_RESULT: - case SS_HANDSHAKE_RECV: - case SS_WAIT_VERDICT: - result = WL_SOCKET_READABLE; - break; - - /* Idle states use read-readiness as a sign that the connection has been - * disconnected. */ - case SS_VOTING: - case SS_IDLE: - result = WL_SOCKET_READABLE; - break; - - /* - * Flush states require write-ready for flushing. - * Active state does both reading and writing. - * - * TODO: SS_ACTIVE sometimes doesn't need to be write-ready. We should - * check sk->flushWrite here to set WL_SOCKET_WRITEABLE. - */ - case SS_SEND_ELECTED_FLUSH: - case SS_ACTIVE: - result = WL_SOCKET_READABLE | WL_SOCKET_WRITEABLE; - break; - - /* The offline state expects no events. */ - case SS_OFFLINE: - result = WL_NO_EVENTS; - break; - - default: - Assert(false); - break; - } - - return result; -} - -/* Returns a human-readable string corresponding to the event set - * - * If the events do not correspond to something set as the `events` field of a `WaitEvent`, the - * returned string may be meaingless. - * - * The string should not be freed. It should also not be expected to remain the same between - * function calls. */ -char* -FormatEvents(uint32 events) -{ - static char return_str[8]; - - /* Helper variable to check if there's extra bits */ - uint32 all_flags = WL_LATCH_SET - | WL_SOCKET_READABLE - | WL_SOCKET_WRITEABLE - | WL_TIMEOUT - | WL_POSTMASTER_DEATH - | WL_EXIT_ON_PM_DEATH - | WL_SOCKET_CONNECTED; - - /* The formatting here isn't supposed to be *particularly* useful -- it's just to give an - * sense of what events have been triggered without needing to remember your powers of two. */ - - return_str[0] = (events & WL_LATCH_SET ) ? 'L' : '_'; - return_str[1] = (events & WL_SOCKET_READABLE ) ? 'R' : '_'; - return_str[2] = (events & WL_SOCKET_WRITEABLE) ? 'W' : '_'; - return_str[3] = (events & WL_TIMEOUT ) ? 'T' : '_'; - return_str[4] = (events & WL_POSTMASTER_DEATH) ? 'D' : '_'; - return_str[5] = (events & WL_EXIT_ON_PM_DEATH) ? 'E' : '_'; - return_str[5] = (events & WL_SOCKET_CONNECTED) ? 'C' : '_'; - - if (events & (~all_flags)) - { - elog(WARNING, "Event formatting found unexpected component %d", - events & (~all_flags)); - return_str[6] = '*'; - return_str[7] = '\0'; - } - else - return_str[6] = '\0'; - - return (char *) &return_str; -} - -/* - * Convert a character which represents a hexadecimal digit to an integer. - * - * Returns -1 if the character is not a hexadecimal digit. - */ -static int -HexDecodeChar(char c) -{ - if (c >= '0' && c <= '9') - return c - '0'; - if (c >= 'a' && c <= 'f') - return c - 'a' + 10; - if (c >= 'A' && c <= 'F') - return c - 'A' + 10; - - return -1; -} - -/* - * Decode a hex string into a byte string, 2 hex chars per byte. - * - * Returns false if invalid characters are encountered; otherwise true. - */ -bool -HexDecodeString(uint8 *result, char *input, int nbytes) -{ - int i; - - for (i = 0; i < nbytes; ++i) - { - int n1 = HexDecodeChar(input[i * 2]); - int n2 = HexDecodeChar(input[i * 2 + 1]); - - if (n1 < 0 || n2 < 0) - return false; - result[i] = n1 * 16 + n2; - } - - return true; -} - -/* -------------------------------- - * pq_getmsgint32_le - get a binary 4-byte int from a message buffer in native (LE) order - * -------------------------------- - */ -uint32 -pq_getmsgint32_le(StringInfo msg) -{ - uint32 n32; - - pq_copymsgbytes(msg, (char *) &n32, sizeof(n32)); - - return n32; -} - -/* -------------------------------- - * pq_getmsgint64 - get a binary 8-byte int from a message buffer in native (LE) order - * -------------------------------- - */ -uint64 -pq_getmsgint64_le(StringInfo msg) -{ - uint64 n64; - - pq_copymsgbytes(msg, (char *) &n64, sizeof(n64)); - - return n64; -} - -/* append a binary [u]int32 to a StringInfo buffer in native (LE) order */ -void -pq_sendint32_le(StringInfo buf, uint32 i) -{ - enlargeStringInfo(buf, sizeof(uint32)); - memcpy(buf->data + buf->len, &i, sizeof(uint32)); - buf->len += sizeof(uint32); -} - -/* append a binary [u]int64 to a StringInfo buffer in native (LE) order */ -void -pq_sendint64_le(StringInfo buf, uint64 i) -{ - enlargeStringInfo(buf, sizeof(uint64)); - memcpy(buf->data + buf->len, &i, sizeof(uint64)); - buf->len += sizeof(uint64); -} - -/* - * Write XLOG data to disk. - */ -void -XLogWalPropWrite(char *buf, Size nbytes, XLogRecPtr recptr) -{ - int startoff; - int byteswritten; - - while (nbytes > 0) - { - int segbytes; - - /* Close the current segment if it's completed */ - if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) - XLogWalPropClose(recptr); - - if (walpropFile < 0) - { - bool use_existent = true; - - /* Create/use new log file */ - XLByteToSeg(recptr, walpropSegNo, wal_segment_size); - walpropFile = XLogFileInit(walpropSegNo, &use_existent, false); - walpropFileTLI = ThisTimeLineID; - } - - /* Calculate the start offset of the received logs */ - startoff = XLogSegmentOffset(recptr, wal_segment_size); - - if (startoff + nbytes > wal_segment_size) - segbytes = wal_segment_size - startoff; - else - segbytes = nbytes; - - /* OK to write the logs */ - errno = 0; - - byteswritten = pg_pwrite(walpropFile, buf, segbytes, (off_t) startoff); - if (byteswritten <= 0) - { - char xlogfname[MAXFNAMELEN]; - int save_errno; - - /* if write didn't set errno, assume no disk space */ - if (errno == 0) - errno = ENOSPC; - - save_errno = errno; - XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); - errno = save_errno; - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not write to log segment %s " - "at offset %u, length %lu: %m", - xlogfname, startoff, (unsigned long) segbytes))); - } - - /* Update state for write */ - recptr += byteswritten; - - nbytes -= byteswritten; - buf += byteswritten; - } - - /* - * Close the current segment if it's fully written up in the last cycle of - * the loop. - */ - if (walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)) - { - XLogWalPropClose(recptr); - } -} - -/* - * Close the current segment. - */ -void -XLogWalPropClose(XLogRecPtr recptr) -{ - Assert(walpropFile >= 0 && !XLByteInSeg(recptr, walpropSegNo, wal_segment_size)); - - if (close(walpropFile) != 0) - { - char xlogfname[MAXFNAMELEN]; - XLogFileName(xlogfname, walpropFileTLI, walpropSegNo, wal_segment_size); - - ereport(PANIC, - (errcode_for_file_access(), - errmsg("could not close log segment %s: %m", - xlogfname))); - } - - walpropFile = -1; -} diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index a841beebf3f..1027d39a32c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -73,7 +73,7 @@ #include "replication/slot.h" #include "replication/snapbuild.h" #include "replication/syncrep.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "replication/walreceiver.h" #include "replication/walsender.h" #include "replication/walsender_private.h" @@ -130,6 +130,12 @@ bool log_replication_commands = false; */ bool wake_wal_senders = false; + +/* + * Backpressure hook, detecting how much we should delay. + */ +uint64 (*delay_backend_us)(void) = NULL; + /* * xlogreader used for replication. Note that a WAL sender doing physical * replication does not need xlogreader to read WAL, but it needs one to @@ -239,7 +245,6 @@ void StartReplication(StartReplicationCmd *cmd); static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); -static void ProcessReplicationFeedbackMessage(void); static void ProcessStandbyHSFeedbackMessage(void); static void ProcessRepliesIfAny(void); static void ProcessPendingWrites(void); @@ -252,8 +257,6 @@ static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, Tran static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write); static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid); static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc); -static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); -static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch); static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo, @@ -711,14 +714,11 @@ StartReplication(StartReplicationCmd *cmd) WalSndSetState(WALSNDSTATE_CATCHUP); /* Send a CopyBothResponse message, and start streaming */ - if (!am_wal_proposer) - { - pq_beginmessage(&buf, 'W'); - pq_sendbyte(&buf, 0); - pq_sendint16(&buf, 0); - pq_endmessage(&buf); - pq_flush(); - } + pq_beginmessage(&buf, 'W'); + pq_sendbyte(&buf, 0); + pq_sendint16(&buf, 0); + pq_endmessage(&buf); + pq_flush(); /* * Don't allow a request to stream from a future point in WAL that @@ -1340,7 +1340,7 @@ ProcessPendingWrites(void) } /* Try to flush pending output to the client */ - if (!am_wal_proposer && pq_flush_if_writable() != 0) + if (pq_flush_if_writable() != 0) WalSndShutdown(); } @@ -1749,9 +1749,6 @@ ProcessRepliesIfAny(void) int r; bool received = false; - if (am_wal_proposer) - return; - last_processing = GetCurrentTimestamp(); /* @@ -1877,10 +1874,6 @@ ProcessStandbyMessage(void) ProcessStandbyHSFeedbackMessage(); break; - case 'z': - ProcessReplicationFeedbackMessage(); - break; - default: ereport(COMMERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), @@ -1953,28 +1946,6 @@ ProcessStandbyReplyMessage(void) LSN_FORMAT_ARGS(applyPtr)); } -// This message is a neon extension of postgres replication protocol -static void -ProcessReplicationFeedbackMessage(void) -{ - ReplicationFeedback rf; - - // consume message length - pq_getmsgint64(&reply_message); - - ParseReplicationFeedbackMessage(&reply_message, &rf); - - replication_feedback_set(&rf); - - SetZenithCurrentClusterSize(rf.currentClusterSize); - - ProcessStandbyReply(rf.ps_writelsn, - rf.ps_flushlsn, - rf.ps_applylsn, - rf.ps_replytime, - false); -} - void ProcessStandbyReply(XLogRecPtr writePtr, XLogRecPtr flushPtr, @@ -2058,13 +2029,6 @@ ProcessStandbyReply(XLogRecPtr writePtr, if (!am_cascading_walsender) SyncRepReleaseWaiters(); - /* - * walproposer use trunclateLsn instead of flushPtr for confirmed - * received location, so we shouldn't update restart_lsn here. - */ - if (am_wal_proposer) - return; - /* * Advance our local xmin horizon when the client confirmed a flush. */ @@ -2394,19 +2358,6 @@ WalSndLoop(WalSndSendDataCallback send_data) /* Check for input from the client */ ProcessRepliesIfAny(); - if (am_wal_proposer) - { - send_data(); - if (WalSndCaughtUp) - { - if (MyWalSnd->state == WALSNDSTATE_CATCHUP) - WalSndSetState(WALSNDSTATE_STREAMING); - WalProposerPoll(); - WalSndCaughtUp = false; - } - continue; - } - /* * If we have received CopyDone from the client, sent CopyDone * ourselves, and the output buffer is empty, it's time to exit @@ -2865,83 +2816,77 @@ XLogSendPhysical(void) nbytes = endptr - startptr; Assert(nbytes <= MAX_SEND_SIZE); - if (am_wal_proposer) - { - WalProposerBroadcast(startptr, endptr); - } + /* + * OK to read and send the slice. + */ + if (output_message.data) + resetStringInfo(&output_message); else - { - /* - * OK to read and send the slice. - */ - if (output_message.data) - resetStringInfo(&output_message); - else - initStringInfo(&output_message); - - pq_sendbyte(&output_message, 'w'); - pq_sendint64(&output_message, startptr); /* dataStart */ - pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ - pq_sendint64(&output_message, 0); /* sendtime, filled in last */ + initStringInfo(&output_message); + + pq_sendbyte(&output_message, 'w'); + pq_sendint64(&output_message, startptr); /* dataStart */ + pq_sendint64(&output_message, SendRqstPtr); /* walEnd */ + pq_sendint64(&output_message, 0); /* sendtime, filled in last */ + + /* + * Read the log directly into the output buffer to avoid extra memcpy + * calls. + */ + enlargeStringInfo(&output_message, nbytes); + +retry: + if (!WALRead(xlogreader, + &output_message.data[output_message.len], + startptr, + nbytes, + xlogreader->seg.ws_tli, /* Pass the current TLI because + * only WalSndSegmentOpen controls + * whether new TLI is needed. */ + &errinfo)) + WALReadRaiseError(&errinfo); - /* - * Read the log directly into the output buffer to avoid extra memcpy - * calls. - */ - enlargeStringInfo(&output_message, nbytes); - - retry: - if (!WALRead(xlogreader, - &output_message.data[output_message.len], - startptr, - nbytes, - xlogreader->seg.ws_tli, /* Pass the current TLI because - * only WalSndSegmentOpen controls - * whether new TLI is needed. */ - &errinfo)) - WALReadRaiseError(&errinfo); - - /* See logical_read_xlog_page(). */ - XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); - CheckXLogRemoved(segno, xlogreader->seg.ws_tli); + /* See logical_read_xlog_page(). */ + XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize); + CheckXLogRemoved(segno, xlogreader->seg.ws_tli); - /* - * During recovery, the currently-open WAL file might be replaced with the - * file of the same name retrieved from archive. So we always need to - * check what we read was valid after reading into the buffer. If it's - * invalid, we try to open and read the file again. - */ - if (am_cascading_walsender) - { - WalSnd *walsnd = MyWalSnd; - bool reload; + /* + * During recovery, the currently-open WAL file might be replaced with the + * file of the same name retrieved from archive. So we always need to + * check what we read was valid after reading into the buffer. If it's + * invalid, we try to open and read the file again. + */ + if (am_cascading_walsender) + { + WalSnd *walsnd = MyWalSnd; + bool reload; - SpinLockAcquire(&walsnd->mutex); - reload = walsnd->needreload; - walsnd->needreload = false; - SpinLockRelease(&walsnd->mutex); + SpinLockAcquire(&walsnd->mutex); + reload = walsnd->needreload; + walsnd->needreload = false; + SpinLockRelease(&walsnd->mutex); - if (reload && xlogreader->seg.ws_file >= 0) - { - wal_segment_close(xlogreader); + if (reload && xlogreader->seg.ws_file >= 0) + { + wal_segment_close(xlogreader); - goto retry; - } + goto retry; } + } - output_message.len += nbytes; - output_message.data[output_message.len] = '\0'; + output_message.len += nbytes; + output_message.data[output_message.len] = '\0'; - /* - * Fill the send timestamp last, so that it is taken as late as possible. - */ - resetStringInfo(&tmpbuf); - pq_sendint64(&tmpbuf, GetCurrentTimestamp()); - memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], - tmpbuf.data, sizeof(int64)); + /* + * Fill the send timestamp last, so that it is taken as late as possible. + */ + resetStringInfo(&tmpbuf); + pq_sendint64(&tmpbuf, GetCurrentTimestamp()); + memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)], + tmpbuf.data, sizeof(int64)); + + pq_putmessage_noblock('d', output_message.data, output_message.len); - pq_putmessage_noblock('d', output_message.data, output_message.len); - } sentPtr = endptr; /* Update shared memory status */ @@ -3657,7 +3602,7 @@ WalSndKeepaliveIfNecessary(void) * eventually reported to have been written, flushed and applied by the * standby in a reply message. */ -static void +void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) { bool buffer_full; @@ -3722,7 +3667,7 @@ LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time) * Return -1 if no new sample data is available, and otherwise the elapsed * time in microseconds. */ -static TimeOffset +TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) { TimestampTz time = 0; @@ -3819,79 +3764,3 @@ LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now) Assert(time != 0); return now - time; } - -/* - * Get minimal write and flush LSN among all live replicas - */ -void -GetMinReplicaLsn(XLogRecPtr* write_lsn, XLogRecPtr* flush_lsn, XLogRecPtr* apply_lsn) -{ - XLogRecPtr min_write_lsn = UnknownXLogRecPtr; - XLogRecPtr min_flush_lsn = UnknownXLogRecPtr; - XLogRecPtr min_apply_lsn = UnknownXLogRecPtr; - for (int i = 0; i < max_wal_senders; i++) - { - WalSnd *walsnd = &WalSndCtl->walsnds[i]; - if (walsnd->state == WALSNDSTATE_STREAMING) - { - /* - * We assume that reads from walsnd->write/flush are atomic - * on all modern x64 systems, as these fields are uint64 and - * should be 8-bytes aligned. - */ - XLogRecPtr written = walsnd->write; - XLogRecPtr flushed = walsnd->flush; - XLogRecPtr applied = walsnd->apply; - min_write_lsn = Min(written, min_write_lsn); - min_flush_lsn = Min(flushed, min_flush_lsn); - min_apply_lsn = Min(applied, min_apply_lsn); - } - } - *write_lsn = min_write_lsn; - *flush_lsn = min_flush_lsn; - *apply_lsn = min_apply_lsn; -} - -// Check if we need to suspend inserts because of lagging replication. -uint64 -backpressure_lag(void) -{ - if (max_replication_apply_lag > 0 || max_replication_flush_lag > 0 || max_replication_write_lag > 0) - { - XLogRecPtr writePtr; - XLogRecPtr flushPtr; - XLogRecPtr applyPtr; - XLogRecPtr myFlushLsn = GetFlushRecPtr(); - - replication_feedback_get_lsns(&writePtr, &flushPtr, &applyPtr); - #define MB ((XLogRecPtr)1024*1024) - - elog(DEBUG2, "current flushLsn %X/%X ReplicationFeedback: write %X/%X flush %X/%X apply %X/%X", - LSN_FORMAT_ARGS(myFlushLsn), - LSN_FORMAT_ARGS(writePtr), - LSN_FORMAT_ARGS(flushPtr), - LSN_FORMAT_ARGS(applyPtr)); - - if ((writePtr != UnknownXLogRecPtr - && max_replication_write_lag > 0 - && myFlushLsn > writePtr + max_replication_write_lag*MB)) - { - return (myFlushLsn - writePtr - max_replication_write_lag*MB); - } - - if ((flushPtr != UnknownXLogRecPtr - && max_replication_flush_lag > 0 - && myFlushLsn > flushPtr + max_replication_flush_lag*MB)) - { - return (myFlushLsn - flushPtr - max_replication_flush_lag*MB); - } - - if ((applyPtr != UnknownXLogRecPtr - && max_replication_apply_lag > 0 - && myFlushLsn > applyPtr + max_replication_apply_lag*MB)) - { - return (myFlushLsn - applyPtr - max_replication_apply_lag*MB); - } - } - return 0; -} diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 233bd081f82..1caef56a6b4 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -34,7 +34,7 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" @@ -152,8 +152,6 @@ CreateSharedMemoryAndSemaphores(void) size = add_size(size, SyncScanShmemSize()); size = add_size(size, AsyncShmemSize()); - size = add_size(size, WalproposerShmemSize()); - #ifdef EXEC_BACKEND size = add_size(size, ShmemBackendArraySize()); #endif @@ -274,8 +272,6 @@ CreateSharedMemoryAndSemaphores(void) SyncScanShmemInit(); AsyncShmemInit(); - WalproposerShmemInit(); - #ifdef EXEC_BACKEND /* diff --git a/src/backend/tcop/postgres.c b/src/backend/tcop/postgres.c index f36bb512b67..b09dfb9f037 100644 --- a/src/backend/tcop/postgres.c +++ b/src/backend/tcop/postgres.c @@ -3388,15 +3388,20 @@ ProcessInterrupts(void) { ProcessInterrupts_pg(); - // Suspend writers until replicas catch up - lag = backpressure_lag(); - if (lag <= 0) - break; + if (delay_backend_us != NULL) + { + // Suspend writers until replicas catch up + lag = delay_backend_us(); + if (lag <= 0) + break; - set_ps_display("backpressure throttling"); + set_ps_display("backpressure throttling"); - elog(DEBUG2, "backpressure throttling: lag %lu", lag); - pg_usleep(BACK_PRESSURE_DELAY); + elog(DEBUG2, "backpressure throttling: lag %lu", lag); + pg_usleep(BACK_PRESSURE_DELAY); + } + else + break; } } diff --git a/src/backend/utils/misc/guc.c b/src/backend/utils/misc/guc.c index 368e91531ed..27d42102789 100644 --- a/src/backend/utils/misc/guc.c +++ b/src/backend/utils/misc/guc.c @@ -79,7 +79,7 @@ #include "replication/syncrep.h" #include "replication/walreceiver.h" #include "replication/walsender.h" -#include "replication/walproposer.h" +#include "replication/walpropshim.h" #include "storage/bufmgr.h" #include "storage/dsm_impl.h" #include "storage/fd.h" @@ -2298,28 +2298,6 @@ static struct config_int ConfigureNamesInt[] = NULL, NULL, NULL }, - { - {"wal_acceptor_reconnect", PGC_SIGHUP, REPLICATION_STANDBY, - gettext_noop("Timeout for reconnecting to offline wal acceptor."), - NULL, - GUC_UNIT_MS - }, - &wal_acceptor_reconnect_timeout, - 1000, 0, INT_MAX, - NULL, NULL, NULL - }, - - { - {"wal_acceptor_connect_timeout", PGC_SIGHUP, REPLICATION_STANDBY, - gettext_noop("Timeout after which give up connection attempt to safekeeper."), - NULL, - GUC_UNIT_MS - }, - &wal_acceptor_connect_timeout, - 5000, 0, INT_MAX, - NULL, NULL, NULL - }, - { {"max_connections", PGC_POSTMASTER, CONN_AUTH_SETTINGS, gettext_noop("Sets the maximum number of concurrent connections."), @@ -4660,17 +4638,6 @@ static struct config_string ConfigureNamesString[] = check_backtrace_functions, assign_backtrace_functions, NULL }, - { - {"safekeepers", PGC_POSTMASTER, UNGROUPED, - gettext_noop("List of Neon WAL acceptors (host:port)"), - NULL, - GUC_LIST_INPUT | GUC_LIST_QUOTE - }, - &wal_acceptors_list, - "", - NULL, NULL, NULL - }, - /* End-of-list marker */ { {NULL, 0, 0, NULL, NULL}, NULL, NULL, NULL, NULL, NULL diff --git a/src/include/replication/walpropshim.h b/src/include/replication/walpropshim.h new file mode 100644 index 00000000000..f78ecda2935 --- /dev/null +++ b/src/include/replication/walpropshim.h @@ -0,0 +1,11 @@ +#ifndef __WALPROPOSER_H__ +#define __WALPROPOSER_H__ + +/* Set to true only in standalone run of `postgres --sync-safekeepers` (see comment on top) */ +extern PGDLLIMPORT bool syncSafekeepers; +extern PGDLLIMPORT void (*WalProposerInit) (XLogRecPtr flushRecPtr, uint64 systemId); +extern PGDLLIMPORT void (*WalProposerStart) (void); + +void WalProposerSync(int argc, char *argv[]); + +#endif diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index fe21617994a..d42de4a8396 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -48,8 +48,25 @@ extern void WalSndInitStopping(void); extern void WalSndWaitStopping(void); extern void HandleWalSndInitStopping(void); extern void WalSndRqstFileReload(void); -extern void GetMinReplicaLsn(XLogRecPtr* write, XLogRecPtr* flush, XLogRecPtr* apply); -extern uint64 backpressure_lag(void); + +/* + * Hook to check for WAL receiving backpressure. + * Return value in microseconds */ +extern uint64 (*delay_backend_us)(void); + +/* newly added functions to support external (re)usage of these features */ +extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); +extern TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); +extern void ProcessStandbyReply(XLogRecPtr writePtr, XLogRecPtr flushPtr, + XLogRecPtr applyPtr, TimestampTz replyTime, + bool replyRequested); +void PhysicalConfirmReceivedLocation(XLogRecPtr lsn); +void ProcessStandbyHSFeedback(TimestampTz replyTime, + TransactionId feedbackXmin, + uint32 feedbackEpoch, + TransactionId feedbackCatalogXmin, + uint32 feedbackCatalogEpoch); + /* * Remember that we want to wakeup walsenders later * From 88a4089bebcd1a87f818b4246c987b81746e10a5 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 5 Aug 2022 15:31:22 +0200 Subject: [PATCH 02/12] Fix unused variable warnings in walproposer_utils.c --- contrib/neon/walproposer_utils.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c index 05b2b90dee6..b7bdd6221d7 100644 --- a/contrib/neon/walproposer_utils.c +++ b/contrib/neon/walproposer_utils.c @@ -489,7 +489,6 @@ XLogWalPropClose(XLogRecPtr recptr) void StartProposerReplication(StartReplicationCmd *cmd) { - StringInfoData buf; XLogRecPtr FlushPtr; if (ThisTimeLineID == 0) @@ -878,8 +877,6 @@ XLogSendPhysical(void) XLogRecPtr startptr; XLogRecPtr endptr; Size nbytes; - XLogSegNo segno; - WALReadError errinfo; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) From 25cca068353bace97e971c94169a1b61603ac78e Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 5 Aug 2022 15:50:13 +0200 Subject: [PATCH 03/12] Annotate nbytes to show it's used for asserts only, fixing one more warning. --- contrib/neon/walproposer_utils.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c index b7bdd6221d7..3cd05498882 100644 --- a/contrib/neon/walproposer_utils.c +++ b/contrib/neon/walproposer_utils.c @@ -876,7 +876,7 @@ XLogSendPhysical(void) XLogRecPtr SendRqstPtr; XLogRecPtr startptr; XLogRecPtr endptr; - Size nbytes; + Size nbytes PG_USED_FOR_ASSERTS_ONLY; /* If requested switch the WAL sender to the stopping state. */ if (got_STOPPING) From aaf7df42fef7a7bc06303296d97e3310353c5f68 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 5 Aug 2022 21:51:14 +0200 Subject: [PATCH 04/12] Fix makefiles: - Include neon extensions into contrib Makefile - Configure libpqwalproposer more like other extensions --- contrib/Makefile | 3 ++ contrib/neon/Makefile | 2 -- contrib/neon/libpqwalproposer/Makefile | 39 +++++++++++--------------- 3 files changed, 19 insertions(+), 25 deletions(-) diff --git a/contrib/Makefile b/contrib/Makefile index f27e458482e..5ffaf14c632 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -26,6 +26,9 @@ SUBDIRS = \ isn \ lo \ ltree \ + neon \ + neon/libpqwalproposer \ + neon_test_utils \ oid2name \ old_snapshot \ pageinspect \ diff --git a/contrib/neon/Makefile b/contrib/neon/Makefile index e81c3660a0b..bc14325e9d1 100644 --- a/contrib/neon/Makefile +++ b/contrib/neon/Makefile @@ -19,8 +19,6 @@ EXTENSION = neon DATA = neon--1.0.sql PGFILEDESC = "neon - cloud storage for PostgreSQL" -SUBDIRS = \ - libpqwalproposer ifdef USE_PGXS PG_CONFIG = pg_config diff --git a/contrib/neon/libpqwalproposer/Makefile b/contrib/neon/libpqwalproposer/Makefile index 08793c1eee8..f9f90ab28d7 100644 --- a/contrib/neon/libpqwalproposer/Makefile +++ b/contrib/neon/libpqwalproposer/Makefile @@ -1,37 +1,30 @@ #------------------------------------------------------------------------- # # Makefile-- -# Makefile for src/backend/replication/libpqwalproposer +# Makefile for contrib/neon/libpqwalproposer # # IDENTIFICATION -# src/backend/replication/libpqwalproposer/Makefile +# contrib/neon/libpqwalproposer/Makefile # #------------------------------------------------------------------------- -subdir = contrib/neon/libpqwalproposer -top_builddir = ../../.. -include $(top_builddir)/src/Makefile.global - -override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS) - +MODULE_big = libpqwalproposer OBJS = \ $(WIN32RES) \ libpqwalproposer.o -SHLIB_LINK_INTERNAL = $(libpq) -SHLIB_LINK = $(filter -lintl, $(LIBS)) -SHLIB_PREREQS = submake-libpq PGFILEDESC = "libpqwalproposer - libpq interface for WAL proposer" -NAME = libpqwalproposer - -all: all-shared-lib -include $(top_srcdir)/src/Makefile.shlib - -install: all installdirs install-lib - -installdirs: installdirs-lib - -uninstall: uninstall-lib +PG_CPPFLAGS = -I$(libpq_srcdir) +SHLIB_LINK_INTERNAL = $(libpq) -clean distclean maintainer-clean: clean-lib - rm -f $(OBJS) +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +SHLIB_PREREQS = submake-libpq +subdir = contrib/neon/libpqwalproposer +top_builddir = ../../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif From 6e043476f5f81ab94f3381ee428feb9834c2201a Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Fri, 5 Aug 2022 22:13:52 +0200 Subject: [PATCH 05/12] Cleanup unused function WalProposerSync from walproposer.c The function is instead available in walpropcompat.c, which is the authoritative version. --- contrib/neon/walproposer.c | 68 -------------------------------------- contrib/neon/walproposer.h | 1 - 2 files changed, 69 deletions(-) diff --git a/contrib/neon/walproposer.c b/contrib/neon/walproposer.c index 4ae7ce763f9..049bb8569ac 100644 --- a/contrib/neon/walproposer.c +++ b/contrib/neon/walproposer.c @@ -278,74 +278,6 @@ WalProposerMain(Datum main_arg) WalProposerStart(); } -/* - * Entry point for `postgres --sync-safekeepers`. - */ -void -WalProposerSync(int argc, char *argv[]) -{ - struct stat stat_buf; - - ThisTimeLineID = 1; - - InitStandaloneProcess(argv[0]); - - SetProcessingMode(InitProcessing); - - /* - * Set default values for command-line options. - */ - InitializeGUCOptions(); - - /* Acquire configuration parameters */ - if (!SelectConfigFiles(NULL, progname)) - exit(1); - - /* - * Imitate we are early in bootstrap loading shared_preload_libraries; - * zenith extension sets PGC_POSTMASTER gucs requiring this. - */ - process_shared_preload_libraries_in_progress = true; - - /* - * Initialize postmaster_alive_fds as WaitEventSet checks them. - * - * Copied from InitPostmasterDeathWatchHandle() - */ - if (pipe(postmaster_alive_fds) < 0) - ereport(FATAL, - (errcode_for_file_access(), - errmsg_internal("could not create pipe to monitor postmaster death: %m"))); - if (fcntl(postmaster_alive_fds[POSTMASTER_FD_WATCH], F_SETFL, O_NONBLOCK) == -1) - ereport(FATAL, - (errcode_for_socket_access(), - errmsg_internal("could not set postmaster death monitoring pipe to nonblocking mode: %m"))); - - ChangeToDataDir(); - - /* Create pg_wal directory, if it doesn't exist */ - if (stat(XLOGDIR, &stat_buf) != 0) - { - ereport(LOG, (errmsg("creating missing WAL directory \"%s\"", XLOGDIR))); - if (MakePGDirectory(XLOGDIR) < 0) - { - ereport(ERROR, - (errcode_for_file_access(), - errmsg("could not create directory \"%s\": %m", - XLOGDIR))); - exit(1); - } - } - - WalProposerInit(0, 0); - - process_shared_preload_libraries_in_progress = false; - - BackgroundWorkerUnblockSignals(); - - WalProposerStart(); -} - /* * Create new AppendRequest message and start sending it. This function is * called from walsender every time the new WAL is available. diff --git a/contrib/neon/walproposer.h b/contrib/neon/walproposer.h index a32b8315759..3dd8bc19191 100644 --- a/contrib/neon/walproposer.h +++ b/contrib/neon/walproposer.h @@ -368,7 +368,6 @@ void WalProposerRegister(void); void ParseReplicationFeedbackMessage(StringInfo reply_message, ReplicationFeedback *rf); extern void StartProposerReplication(StartReplicationCmd *cmd); -void WalProposerSync(int argc, char *argv[]); Size WalproposerShmemSize(void); bool WalproposerShmemInit(void); From 73092bbe5727700a777a30d43eabe40f2b16ae12 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Mon, 8 Aug 2022 15:59:16 +0200 Subject: [PATCH 06/12] Add comment about lack of PG timelines, and make StartReplication static again. --- contrib/neon/walproposer_utils.c | 3 +++ src/backend/replication/walsender.c | 4 ++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c index 3cd05498882..56db6f99a45 100644 --- a/contrib/neon/walproposer_utils.c +++ b/contrib/neon/walproposer_utils.c @@ -536,6 +536,9 @@ StartProposerReplication(StartReplicationCmd *cmd) * Select the timeline. If it was given explicitly by the client, use * that. Otherwise use the timeline of the last replayed record, which is * kept in ThisTimeLineID. + * + * Neon doesn't currently use PG Timelines, but it may in the future, so + * we keep this code around to lighten the load for when we need it. */ if (am_cascading_walsender) { diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 1027d39a32c..ce16a78a61c 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -241,7 +241,7 @@ static XLogRecPtr GetStandbyFlushRecPtr(void); static void IdentifySystem(void); static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd); static void DropReplicationSlot(DropReplicationSlotCmd *cmd); -void StartReplication(StartReplicationCmd *cmd); +static void StartReplication(StartReplicationCmd *cmd); static void StartLogicalReplication(StartReplicationCmd *cmd); static void ProcessStandbyMessage(void); static void ProcessStandbyReplyMessage(void); @@ -573,7 +573,7 @@ SendTimeLineHistory(TimeLineHistoryCmd *cmd) * At the moment, this never returns, but an ereport(ERROR) will take us back * to the main loop. */ -void +static void StartReplication(StartReplicationCmd *cmd) { StringInfoData buf; From 799666501b86cfad9e4866ca00913d33a45469a9 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 9 Aug 2022 14:19:03 +0200 Subject: [PATCH 07/12] Fix some compiler warnings in vendor/postgres, and pull libpqwalproposer into vendor/neon --- contrib/Makefile | 2 +- contrib/neon/Makefile | 1 + .../{libpqwalproposer => }/libpqwalproposer.c | 9 ++---- contrib/neon/libpqwalproposer/Makefile | 30 ------------------- contrib/neon/neon.c | 1 + contrib/neon/neon.h | 1 + contrib/neon/pagestore_client.h | 2 +- contrib/neon/pagestore_smgr.c | 2 +- contrib/neon/walproposer_utils.c | 1 - 9 files changed, 9 insertions(+), 40 deletions(-) rename contrib/neon/{libpqwalproposer => }/libpqwalproposer.c (98%) delete mode 100644 contrib/neon/libpqwalproposer/Makefile diff --git a/contrib/Makefile b/contrib/Makefile index 5ffaf14c632..93b29294fd0 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -27,7 +27,7 @@ SUBDIRS = \ lo \ ltree \ neon \ - neon/libpqwalproposer \ +# neon/libpqwalproposer neon_test_utils \ oid2name \ old_snapshot \ diff --git a/contrib/neon/Makefile b/contrib/neon/Makefile index bc14325e9d1..d1f48a988c6 100644 --- a/contrib/neon/Makefile +++ b/contrib/neon/Makefile @@ -6,6 +6,7 @@ OBJS = \ $(WIN32RES) \ inmem_smgr.o \ libpagestore.o \ + libpqwalproposer.o \ pagestore_smgr.o \ relsize_cache.o \ neon.o \ diff --git a/contrib/neon/libpqwalproposer/libpqwalproposer.c b/contrib/neon/libpqwalproposer.c similarity index 98% rename from contrib/neon/libpqwalproposer/libpqwalproposer.c rename to contrib/neon/libpqwalproposer.c index f92498f7bc7..2b2b7a1a6a4 100644 --- a/contrib/neon/libpqwalproposer/libpqwalproposer.c +++ b/contrib/neon/libpqwalproposer.c @@ -1,11 +1,8 @@ #include "postgres.h" -#include "../walproposer.h" #include "libpq-fe.h" - -/* Required for anything that's dynamically loaded */ -PG_MODULE_MAGIC; -void _PG_init(void); +#include "neon.h" +#include "walproposer.h" /* Header in walproposer.h -- Wrapper struct to abstract away the libpq connection */ struct WalProposerConn @@ -46,7 +43,7 @@ static WalProposerFunctionsType PQWalProposerFunctions = { /* Module initialization */ void -_PG_init(void) +pg_init_libpqwalproposer(void) { if (WalProposerFunctions != NULL) elog(ERROR, "libpqwalproposer already loaded"); diff --git a/contrib/neon/libpqwalproposer/Makefile b/contrib/neon/libpqwalproposer/Makefile deleted file mode 100644 index f9f90ab28d7..00000000000 --- a/contrib/neon/libpqwalproposer/Makefile +++ /dev/null @@ -1,30 +0,0 @@ -#------------------------------------------------------------------------- -# -# Makefile-- -# Makefile for contrib/neon/libpqwalproposer -# -# IDENTIFICATION -# contrib/neon/libpqwalproposer/Makefile -# -#------------------------------------------------------------------------- - -MODULE_big = libpqwalproposer -OBJS = \ - $(WIN32RES) \ - libpqwalproposer.o -PGFILEDESC = "libpqwalproposer - libpq interface for WAL proposer" - -PG_CPPFLAGS = -I$(libpq_srcdir) -SHLIB_LINK_INTERNAL = $(libpq) - -ifdef USE_PGXS -PG_CONFIG = pg_config -PGXS := $(shell $(PG_CONFIG) --pgxs) -include $(PGXS) -else -SHLIB_PREREQS = submake-libpq -subdir = contrib/neon/libpqwalproposer -top_builddir = ../../.. -include $(top_builddir)/src/Makefile.global -include $(top_srcdir)/contrib/contrib-global.mk -endif diff --git a/contrib/neon/neon.c b/contrib/neon/neon.c index d90244804f4..94ff9851eae 100644 --- a/contrib/neon/neon.c +++ b/contrib/neon/neon.c @@ -31,6 +31,7 @@ void _PG_init(void); void _PG_init(void) { pg_init_libpagestore(); + pg_init_libpqwalproposer(); pg_init_walproposer(); } diff --git a/contrib/neon/neon.h b/contrib/neon/neon.h index 57be89da883..b98228c46b2 100644 --- a/contrib/neon/neon.h +++ b/contrib/neon/neon.h @@ -6,6 +6,7 @@ #define NEON_H extern void pg_init_libpagestore(void); +extern void pg_init_libpqwalproposer(void); extern void pg_init_walproposer(void); #endif /* NEON_H */ diff --git a/contrib/neon/pagestore_client.h b/contrib/neon/pagestore_client.h index 93ea6771eb9..f79a3c9142f 100644 --- a/contrib/neon/pagestore_client.h +++ b/contrib/neon/pagestore_client.h @@ -182,7 +182,7 @@ extern void zenith_write(SMgrRelation reln, ForkNumber forknum, extern void zenith_writeback(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, BlockNumber nblocks); extern BlockNumber zenith_nblocks(SMgrRelation reln, ForkNumber forknum); -extern int64 zenith_dbsize(Oid dbNode); +extern const int64 zenith_dbsize(Oid dbNode); extern void zenith_truncate(SMgrRelation reln, ForkNumber forknum, BlockNumber nblocks); extern void zenith_immedsync(SMgrRelation reln, ForkNumber forknum); diff --git a/contrib/neon/pagestore_smgr.c b/contrib/neon/pagestore_smgr.c index 5fdfea5e487..3e1b74dba7c 100644 --- a/contrib/neon/pagestore_smgr.c +++ b/contrib/neon/pagestore_smgr.c @@ -1336,7 +1336,7 @@ zenith_nblocks(SMgrRelation reln, ForkNumber forknum) /* * zenith_db_size() -- Get the size of the database in bytes. */ -int64 +const int64 zenith_dbsize(Oid dbNode) { ZenithResponse *resp; diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c index 56db6f99a45..04973c293bc 100644 --- a/contrib/neon/walproposer_utils.c +++ b/contrib/neon/walproposer_utils.c @@ -67,7 +67,6 @@ static bool streamingDoneReceiving; static bool WalSndCaughtUp = false; /* Flags set by signal handlers for later service in main loop */ -static volatile sig_atomic_t got_SIGUSR2 = false; static volatile sig_atomic_t got_STOPPING = false; /* From 8cb526b8157a67ce190d250502de462bc64976b2 Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 9 Aug 2022 14:56:22 +0200 Subject: [PATCH 08/12] Directly initialize libpqwalproposer libpqwalproposer is not a separate binary anymore, so don't try to treat it as such. --- contrib/neon/walproposer.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/contrib/neon/walproposer.c b/contrib/neon/walproposer.c index 049bb8569ac..55c2726af07 100644 --- a/contrib/neon/walproposer.c +++ b/contrib/neon/walproposer.c @@ -400,7 +400,8 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId) char *port; /* Load the libpq-specific functions */ - load_file("libpqwalproposer", false); + pg_init_libpqwalproposer(); + if (WalProposerFunctions == NULL) elog(ERROR, "libpqwalproposer didn't initialize correctly"); From 5de7cfd8f8e3a67d6ea0549490dc1245160c9a9a Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 9 Aug 2022 15:15:13 +0200 Subject: [PATCH 09/12] Remove initialization --- contrib/neon/walproposer.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/contrib/neon/walproposer.c b/contrib/neon/walproposer.c index 55c2726af07..e25c4d92295 100644 --- a/contrib/neon/walproposer.c +++ b/contrib/neon/walproposer.c @@ -400,8 +400,6 @@ WalProposerInitImpl(XLogRecPtr flushRecPtr, uint64 systemId) char *port; /* Load the libpq-specific functions */ - pg_init_libpqwalproposer(); - if (WalProposerFunctions == NULL) elog(ERROR, "libpqwalproposer didn't initialize correctly"); From aba4c5efa9988cda36d88e4dc4c3e88a00781c9f Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 9 Aug 2022 15:52:25 +0200 Subject: [PATCH 10/12] Fix issue with makefile that didn't get caught in the normal test envs. --- contrib/Makefile | 1 - 1 file changed, 1 deletion(-) diff --git a/contrib/Makefile b/contrib/Makefile index 93b29294fd0..9caec6cb81f 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -27,7 +27,6 @@ SUBDIRS = \ lo \ ltree \ neon \ -# neon/libpqwalproposer neon_test_utils \ oid2name \ old_snapshot \ From 691f6a6c16bae70bce5ea24a0e25d6e701f9661e Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Tue, 9 Aug 2022 18:20:02 +0200 Subject: [PATCH 11/12] Remove now unapplicable warning --- contrib/neon/walproposer_utils.c | 1 - 1 file changed, 1 deletion(-) diff --git a/contrib/neon/walproposer_utils.c b/contrib/neon/walproposer_utils.c index 04973c293bc..cd8fd556c22 100644 --- a/contrib/neon/walproposer_utils.c +++ b/contrib/neon/walproposer_utils.c @@ -11,7 +11,6 @@ #include "postmaster/interrupt.h" #include "replication/slot.h" #include "walproposer_utils.h" -/* WARNING - this may be broken */ #include "replication/walsender_private.h" #include "storage/ipc.h" From a9733854acb3bf018934f4df9f116ce8344901ef Mon Sep 17 00:00:00 2001 From: Matthias van de Meent Date: Thu, 18 Aug 2022 10:17:54 +0200 Subject: [PATCH 12/12] Various cleanups --- contrib/neon/neon.h | 13 ++++++++++--- src/backend/storage/ipc/ipci.c | 1 - src/include/replication/walpropshim.h | 10 +++++++++- src/include/replication/walsender.h | 2 +- 4 files changed, 20 insertions(+), 6 deletions(-) diff --git a/contrib/neon/neon.h b/contrib/neon/neon.h index b98228c46b2..2c66bc7bf05 100644 --- a/contrib/neon/neon.h +++ b/contrib/neon/neon.h @@ -1,6 +1,13 @@ -// -// Created by Matthias on 2022-08-04. -// +/*------------------------------------------------------------------------- + * + * neon.h + * Functions used in the initialization of this extension. + * + * IDENTIFICATION + * contrib/neon/neon.h + * + *------------------------------------------------------------------------- + */ #ifndef NEON_H #define NEON_H diff --git a/src/backend/storage/ipc/ipci.c b/src/backend/storage/ipc/ipci.c index 1caef56a6b4..de498c21dba 100644 --- a/src/backend/storage/ipc/ipci.c +++ b/src/backend/storage/ipc/ipci.c @@ -34,7 +34,6 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" -#include "replication/walpropshim.h" #include "storage/bufmgr.h" #include "storage/dsm.h" #include "storage/ipc.h" diff --git a/src/include/replication/walpropshim.h b/src/include/replication/walpropshim.h index f78ecda2935..07757580cc9 100644 --- a/src/include/replication/walpropshim.h +++ b/src/include/replication/walpropshim.h @@ -1,7 +1,15 @@ +/* + * walpropshim.h + * various hooks for the walproposer component of the Neon extension. + */ + #ifndef __WALPROPOSER_H__ #define __WALPROPOSER_H__ -/* Set to true only in standalone run of `postgres --sync-safekeepers` (see comment on top) */ +/* + * Set to true only in standalone run of `postgres --sync-safekeepers`. + * See also the top comment in contrib/neon/walproposer.c + */ extern PGDLLIMPORT bool syncSafekeepers; extern PGDLLIMPORT void (*WalProposerInit) (XLogRecPtr flushRecPtr, uint64 systemId); extern PGDLLIMPORT void (*WalProposerStart) (void); diff --git a/src/include/replication/walsender.h b/src/include/replication/walsender.h index d42de4a8396..f902457c26b 100644 --- a/src/include/replication/walsender.h +++ b/src/include/replication/walsender.h @@ -54,7 +54,7 @@ extern void WalSndRqstFileReload(void); * Return value in microseconds */ extern uint64 (*delay_backend_us)(void); -/* newly added functions to support external (re)usage of these features */ +/* expose these so that they can be reused by the neon walproposer extension */ extern void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time); extern TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now); extern void ProcessStandbyReply(XLogRecPtr writePtr, XLogRecPtr flushPtr,