Skip to content

Commit

Permalink
[refer #111] Persist logical replication files in WAL (#306)
Browse files Browse the repository at this point in the history
* [refer #111] Persist logical rep;lication files in WAL and include then in basebackup at PS

* Fix warnings

* Write origin logical record snapshot in WAL only if there are valid origins

* Store only logical replication slots

* Fix dropping replication slots

* Replace sprintf with snprintf to make Arnica happy

* Do not checkpoint replication origin at shutdown

* Add PreCheckPointGuts function to sync replication state before start of shutdown checkpoint

* Log heap rewrite file after creation.

---------

Co-authored-by: Konstantin Knizhnik <knizhnik@neon.tech>
Co-authored-by: Arseny Sher <sher-ars@yandex.ru>
  • Loading branch information
3 people authored and tristan957 committed Feb 5, 2024
1 parent 6b15d2d commit 7e5ed2a
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 6 deletions.
39 changes: 38 additions & 1 deletion src/backend/access/heap/rewriteheap.c
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "replication/slot.h"
#include "storage/bufmgr.h"
#include "storage/fd.h"
Expand Down Expand Up @@ -785,6 +786,36 @@ raw_heap_insert(RewriteState state, HeapTuple tup)
* ------------------------------------------------------------------------
*/

/*
* NEON: we need to persist mapping file in WAL
*/
static void
wallog_mapping_file(char const* path, int fd)
{
char prefix[MAXPGPATH];
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
if (fd < 0)
{
elog(DEBUG1, "neon: deleting contents of rewrite file %s", path);
/* unlink file */
LogLogicalMessage(prefix, NULL, 0, false);
}
else
{
off_t size = lseek(fd, 0, SEEK_END);
char* buf;
elog(DEBUG1, "neon: writing contents of rewrite file %s, size %ld", path, size);
if (size < 0)
elog(ERROR, "Failed to get size of mapping file: %m");
buf = palloc((size_t)size);
lseek(fd, 0, SEEK_SET);
if (read(fd, buf, (size_t)size) != size)
elog(ERROR, "Failed to read mapping file: %m");
LogLogicalMessage(prefix, buf, (size_t)size, false);
pfree(buf);
}
}

/*
* Do preparations for logging logical mappings during a rewrite if
* necessary. If we detect that we don't need to log anything we'll prevent
Expand Down Expand Up @@ -920,6 +951,7 @@ logical_heap_rewrite_flush_mappings(RewriteState state)
errmsg("could not write to file \"%s\", wrote %d of %d: %m", src->path,
written, len)));
src->off += len;
wallog_mapping_file(src->path, FileGetRawDesc(src->vfd));

XLogBeginInsert();
XLogRegisterData((char *) (&xlrec), sizeof(xlrec));
Expand Down Expand Up @@ -1006,7 +1038,7 @@ logical_rewrite_log_mapping(RewriteState state, TransactionId xid,
src->off = 0;
memcpy(src->path, path, sizeof(path));
src->vfd = PathNameOpenFile(path,
O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
O_CREAT | O_EXCL | O_RDWR | PG_BINARY);
if (src->vfd < 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down Expand Up @@ -1172,6 +1204,8 @@ heap_xlog_logical_rewrite(XLogReaderState *r)
errmsg("could not fsync file \"%s\": %m", path)));
pgstat_report_wait_end();

wallog_mapping_file(path, fd);

if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down Expand Up @@ -1247,6 +1281,7 @@ CheckPointLogicalRewriteHeap(void)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not remove file \"%s\": %m", path)));
wallog_mapping_file(path, -1);
}
else
{
Expand Down Expand Up @@ -1275,6 +1310,8 @@ CheckPointLogicalRewriteHeap(void)
errmsg("could not fsync file \"%s\": %m", path)));
pgstat_report_wait_end();

wallog_mapping_file(path, fd);

if (CloseTransientFile(fd) != 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand Down
35 changes: 30 additions & 5 deletions src/backend/access/transam/xlog.c
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,7 @@ static void CreateEndOfRecoveryRecord(void);
static XLogRecPtr CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn,
XLogRecPtr missingContrecPtr,
TimeLineID newTLI);
static void PreCheckPointGuts(int flags);
static void CheckPointGuts(XLogRecPtr checkPointRedo, int flags);
static void KeepLogSeg(XLogRecPtr recptr, XLogSegNo *logSegNo);
static XLogRecPtr XLogGetReplicationSlotMinimumLSN(void);
Expand Down Expand Up @@ -6686,6 +6687,11 @@ CreateCheckPoint(int flags)
*/
SyncPreCheckpoint();

/*
* NEON: perform checkpiont action requiring write to the WAL before we determine the REDO pointer.
*/
PreCheckPointGuts(flags);

/*
* Use a critical section to force system panic if we have trouble.
*/
Expand Down Expand Up @@ -7193,6 +7199,28 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr,
return recptr;
}

static void
CheckPointReplicationState(void)
{
CheckPointRelationMap();
CheckPointReplicationSlots();
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointReplicationOrigin();
}

/*
* NEON: we use logical records to persist information of about slots, origins, relation map...
* If it is done inside shutdown checkpoint, then Postgres panics: "concurrent write-ahead log activity while database system is shutting down"
* So it before checkpoint REDO position is determined.
*/
static void
PreCheckPointGuts(int flags)
{
if (flags & CHECKPOINT_IS_SHUTDOWN)
CheckPointReplicationState();
}

/*
* Flush all data in shared memory to disk, and fsync
*
Expand All @@ -7202,11 +7230,8 @@ CreateOverwriteContrecordRecord(XLogRecPtr aborted_lsn, XLogRecPtr pagePtr,
static void
CheckPointGuts(XLogRecPtr checkPointRedo, int flags)
{
CheckPointRelationMap();
CheckPointReplicationSlots();
CheckPointSnapBuild();
CheckPointLogicalRewriteHeap();
CheckPointReplicationOrigin();
if (!(flags & CHECKPOINT_IS_SHUTDOWN))
CheckPointReplicationState();

/* Write out all dirty data in SLRUs and the main buffer pool */
TRACE_POSTGRESQL_BUFFER_CHECKPOINT_START(flags);
Expand Down
19 changes: 19 additions & 0 deletions src/backend/replication/logical/origin.c
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
#include "nodes/execnodes.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "replication/origin.h"
#include "storage/condition_variable.h"
#include "storage/copydir.h"
Expand Down Expand Up @@ -562,10 +563,14 @@ CheckPointReplicationOrigin(void)
int i;
uint32 magic = REPLICATION_STATE_MAGIC;
pg_crc32c crc;
char *buf;
size_t chkp_size;

if (max_replication_slots == 0)
return;

buf = palloc(sizeof(magic) + max_replication_slots*sizeof(ReplicationStateOnDisk) + sizeof(crc));

INIT_CRC32C(crc);

/* make sure no old temp file is remaining */
Expand Down Expand Up @@ -599,6 +604,9 @@ CheckPointReplicationOrigin(void)
errmsg("could not write to file \"%s\": %m",
tmppath)));
}
memcpy(buf, &magic, sizeof magic);
chkp_size = sizeof(magic);

COMP_CRC32C(crc, &magic, sizeof(magic));

/* prevent concurrent creations/drops */
Expand Down Expand Up @@ -641,6 +649,8 @@ CheckPointReplicationOrigin(void)
errmsg("could not write to file \"%s\": %m",
tmppath)));
}
memcpy(buf + chkp_size, &disk_state, sizeof(disk_state));
chkp_size += sizeof(disk_state);

COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
}
Expand All @@ -660,6 +670,15 @@ CheckPointReplicationOrigin(void)
errmsg("could not write to file \"%s\": %m",
tmppath)));
}
if (chkp_size != sizeof(magic)) /* has some valid origins */
{
memcpy(buf + chkp_size, &crc, sizeof crc);
chkp_size += sizeof(crc);

/* NEON specific: persist snapshot in storage using logical message */
LogLogicalMessage("neon-file:pg_logical/replorigin_checkpoint", buf, chkp_size, false);
}
pfree(buf);

if (CloseTransientFile(tmpfd) != 0)
ereport(PANIC,
Expand Down
11 changes: 11 additions & 0 deletions src/backend/replication/logical/snapbuild.c
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/logical.h"
#include "replication/message.h"
#include "replication/reorderbuffer.h"
#include "replication/snapbuild.h"
#include "storage/block.h" /* debugging output */
Expand Down Expand Up @@ -1611,6 +1612,7 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
int fd;
char tmppath[MAXPGPATH];
char path[MAXPGPATH];
char prefix[MAXPGPATH];
int ret;
struct stat stat_buf;
Size sz;
Expand Down Expand Up @@ -1733,6 +1735,10 @@ SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
(errcode_for_file_access(),
errmsg("could not open file \"%s\": %m", tmppath)));

/* NEON specific: persist snapshot in storage using logical message */
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
LogLogicalMessage(prefix, (char*)ondisk, needed_length, false);

errno = 0;
pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
if ((write(fd, ondisk, needed_length)) != needed_length)
Expand Down Expand Up @@ -2039,6 +2045,7 @@ CheckPointSnapBuild(void)
DIR *snap_dir;
struct dirent *snap_de;
char path[MAXPGPATH + 21];
char prefix[MAXPGPATH + 31];

/*
* We start off with a minimum of the last redo pointer. No new
Expand Down Expand Up @@ -2097,6 +2104,10 @@ CheckPointSnapBuild(void)
{
elog(DEBUG1, "removing snapbuild snapshot %s", path);

/* NEON specific: delete file from storage using logical message */
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
LogLogicalMessage(prefix, NULL, 0, false);

/*
* It's not particularly harmful, though strange, if we can't
* remove the file here. Don't prevent the checkpoint from
Expand Down
19 changes: 19 additions & 0 deletions src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "replication/slot.h"
#include "replication/message.h"
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/proc.h"
Expand Down Expand Up @@ -683,6 +684,15 @@ ReplicationSlotDropPtr(ReplicationSlot *slot)
sprintf(path, "pg_replslot/%s", NameStr(slot->data.name));
sprintf(tmppath, "pg_replslot/%s.tmp", NameStr(slot->data.name));

if (SlotIsLogical(slot))
{
/* NEON specific: delete slot from storage using logical message */
char prefix[MAXPGPATH];
snprintf(prefix, sizeof(prefix), "neon-file:%s/state", path);
elog(LOG, "Drop replication slot %s", path);
LogLogicalMessage(prefix, NULL, 0, false);
}

/*
* Rename the slot directory on disk, so that we'll no longer recognize
* this as a valid slot. Note that if this fails, we've got to mark the
Expand Down Expand Up @@ -1649,6 +1659,15 @@ SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
ReplicationSlotOnDiskChecksummedSize);
FIN_CRC32C(cp.checksum);

if (SlotIsLogical(slot) && cp.slotdata.restart_lsn != InvalidXLogRecPtr)
{
/* NEON specific: persist slot in storage using logical message */
char prefix[MAXPGPATH];
snprintf(prefix, sizeof(prefix), "neon-file:%s", path);
elog(LOG, "Save replication slot at %s restart_lsn=%X/%X", path, LSN_FORMAT_ARGS(cp.slotdata.restart_lsn));
LogLogicalMessage(prefix, (char*)&cp, sizeof cp, false);
}

errno = 0;
pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
Expand Down

0 comments on commit 7e5ed2a

Please sign in to comment.