Skip to content

Commit

Permalink
PendingDelete: expand the pending deletes interface
Browse files Browse the repository at this point in the history
  • Loading branch information
jiaqizho committed May 21, 2024
1 parent fd453bf commit e545a96
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 230 deletions.
173 changes: 88 additions & 85 deletions src/backend/catalog/storage.c
Original file line number Diff line number Diff line change
Expand Up @@ -41,33 +41,6 @@
/* GUC variables */
int wal_skip_threshold = 2048; /* in kilobytes */

/*
* We keep a list of all relations (represented as RelFileNode values)
* that have been created or deleted in the current transaction. When
* a relation is created, we create the physical file immediately, but
* remember it so that we can delete the file again if the current
* transaction is aborted. Conversely, a deletion request is NOT
* executed immediately, but is just entered in the list. When and if
* the transaction commits, we can delete the physical file.
*
* To handle subtransactions, every entry is marked with its transaction
* nesting level. At subtransaction commit, we reassign the subtransaction's
* entries to the parent nesting level. At subtransaction abort, we can
* immediately execute the abort-time actions for all entries of the current
* nesting level.
*
* NOTE: the list is kept in TopMemoryContext to be sure it won't disappear
* unbetimes. It'd probably be OK to keep it in TopTransactionContext,
* but I'm being paranoid.
*/

typedef struct PendingRelDelete
{
RelFileNodePendingDelete relnode; /* relation that may need to be deleted */
bool atCommit; /* T=delete at commit; F=delete at abort */
int nestLevel; /* xact nesting level of request */
struct PendingRelDelete *next; /* linked-list link */
} PendingRelDelete;

typedef struct PendingRelSync
{
Expand All @@ -79,6 +52,39 @@ static PendingRelDelete *pendingDeletes = NULL; /* head of linked list */
HTAB *pendingSyncHash = NULL;


static
void
StoargeDestroyPendingRelDelete(PendingRelDelete *reldelete)
{
pfree(reldelete);
}

static
void
StorageDoPendingRelDelete(PendingRelDelete *delete)
{
SMgrRelation srel;

/*
* GPDB: backend can only be TempRelBackendId or InvalidBackendId for a
* given relfile since we don't tie temp relations to their backends.
*/
srel = smgropen(delete->relnode.node,
delete->relnode.isTempRelation ?
TempRelBackendId : InvalidBackendId,
delete->relnode.smgr_which, NULL);
smgrdounlinkall(&srel, 1, false);

smgrclose(srel);
}

struct PendingRelDeleteAction storage_pending_rel_deletes_action = {
.flags = PENDING_REL_DELETE_NEED_PRESERVE | PENDING_REL_DELETE_NEED_XLOG | PENDING_REL_DELETE_NEED_SYNC,
.destroy_pending_rel_delete = StoargeDestroyPendingRelDelete,
.do_pending_rel_delete = StorageDoPendingRelDelete
};


/*
* AddPendingSync
* Queue an at-commit fsync.
Expand Down Expand Up @@ -160,8 +166,8 @@ RelationCreateStorage(RelFileNode rnode, char relpersistence, SMgrImpl smgr_whic
pending->atCommit = false; /* delete if abort */
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->relnode.smgr_which = smgr_which;
pending->next = pendingDeletes;
pendingDeletes = pending;
pending->action = &storage_pending_rel_deletes_action;
RegisterPendingDelete(pending);

if (relpersistence == RELPERSISTENCE_PERMANENT && !XLogIsNeeded())
{
Expand Down Expand Up @@ -210,8 +216,8 @@ RelationDropStorage(Relation rel)
pending->nestLevel = GetCurrentTransactionNestLevel();
pending->relnode.smgr_which =
RelationIsAppendOptimized(rel) ? SMGR_AO : SMGR_MD;
pending->next = pendingDeletes;
pendingDeletes = pending;
pending->action = &storage_pending_rel_deletes_action;
RegisterPendingDelete(pending);

/*
* NOTE: if the relation was created in this transaction, it will now be
Expand Down Expand Up @@ -254,6 +260,11 @@ RelationPreserveStorage(RelFileNode rnode, bool atCommit)
for (pending = pendingDeletes; pending != NULL; pending = next)
{
next = pending->next;
Assert(pending->action);
if (!(pending->action->flags & PENDING_REL_DELETE_NEED_PRESERVE)) {
continue;
}

if (RelFileNodeEquals(rnode, pending->relnode.node)
&& pending->atCommit == atCommit)
{
Expand Down Expand Up @@ -337,13 +348,13 @@ RelationTruncate(Relation rel, BlockNumber nblocks)
* is in progress.
*
* The truncation operation might drop buffers that the checkpoint
* otherwise would have flushed. If it does, then it's essential that
* the files actually get truncated on disk before the checkpoint record
* is written. Otherwise, if reply begins from that checkpoint, the
* otherwise would have flushed. If it does, then it's essential that the
* files actually get truncated on disk before the checkpoint record is
* written. Otherwise, if reply begins from that checkpoint, the
* to-be-truncated blocks might still exist on disk but have older
* contents than expected, which can cause replay to fail. It's OK for
* the blocks to not exist on disk at all, but not for them to have the
* wrong contents.
* contents than expected, which can cause replay to fail. It's OK for the
* blocks to not exist on disk at all, but not for them to have the wrong
* contents.
*/
Assert(!MyProc->delayChkptEnd);
MyProc->delayChkptEnd = true;
Expand Down Expand Up @@ -583,11 +594,12 @@ SerializePendingSyncs(Size maxSize, char *startAddress)
(void) hash_search(tmphash, &sync->rnode, HASH_ENTER, NULL);

/* remove deleted rnodes */
for (delete = pendingDeletes; delete != NULL; delete = delete->next)
if (delete->atCommit)
for (delete = pendingDeletes; delete != NULL; delete = delete->next) {
Assert(delete->action);
if (delete->atCommit && (delete->action->flags & PENDING_REL_DELETE_NEED_SYNC))
(void) hash_search(tmphash, (void *) &delete->relnode,
HASH_REMOVE, NULL);

}
hash_seq_init(&scan, tmphash);
while ((src = (RelFileNode *) hash_seq_search(&scan)))
*dest++ = *src;
Expand Down Expand Up @@ -616,6 +628,15 @@ RestorePendingSyncs(char *startAddress)
AddPendingSync(rnode);
}

void
RegisterPendingDelete(struct PendingRelDelete *delete)
{
Assert(delete);
Assert(delete->action);
delete->next = pendingDeletes;
pendingDeletes = delete;
}

/*
* smgrDoPendingDeletes() -- Take care of relation deletes at end of xact.
*
Expand All @@ -634,11 +655,6 @@ smgrDoPendingDeletes(bool isCommit)
PendingRelDelete *pending;
PendingRelDelete *prev;
PendingRelDelete *next;
int nrels = 0,
maxrels = 0;
SMgrRelation *srels = NULL;

UFileDoDeletesActions(isCommit);

prev = NULL;
for (pending = pendingDeletes; pending != NULL; pending = next)
Expand All @@ -657,46 +673,21 @@ smgrDoPendingDeletes(bool isCommit)
else
pendingDeletes = next;
/* do deletion if called for */

if (pending->atCommit == isCommit)
{
SMgrRelation srel;
/* GPDB: backend can only be TempRelBackendId or
* InvalidBackendId for a given relfile since we don't tie temp
* relations to their backends. */
srel = smgropen(pending->relnode.node,
pending->relnode.isTempRelation ?
TempRelBackendId : InvalidBackendId,
pending->relnode.smgr_which, NULL);

/* allocate the initial array, or extend it, if needed */
if (maxrels == 0)
{
maxrels = 8;
srels = palloc(sizeof(SMgrRelation) * maxrels);
}
else if (maxrels <= nrels)
{
maxrels *= 2;
srels = repalloc(srels, sizeof(SMgrRelation) * maxrels);
}

srels[nrels++] = srel;
Assert(pending->action);
Assert(pending->action->do_pending_rel_delete);
pending->action->do_pending_rel_delete(pending);
}

/* must explicitly free the list entry */
pfree(pending);
Assert(pending->action);
Assert(pending->action->destroy_pending_rel_delete);
pending->action->destroy_pending_rel_delete(pending);
/* prev does not change */
}
}

if (nrels > 0)
{
smgrdounlinkall(srels, nrels, false);

for (int i = 0; i < nrels; i++)
smgrclose(srels[i]);

pfree(srels);
}
}

/*
Expand Down Expand Up @@ -734,11 +725,12 @@ smgrDoPendingSyncs(bool isCommit, bool isParallelWorker)
}

/* Skip syncing nodes that smgrDoPendingDeletes() will delete. */
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
if (pending->atCommit)
for (pending = pendingDeletes; pending != NULL; pending = pending->next) {
Assert(pending->action);
if (pending->atCommit && (pending->action->flags == PENDING_REL_DELETE_NEED_SYNC))
(void) hash_search(pendingSyncHash, (void *) &pending->relnode,
HASH_REMOVE, NULL);

}
hash_seq_init(&scan, pendingSyncHash);
while ((pendingsync = (PendingRelSync *) hash_seq_search(&scan)))
{
Expand Down Expand Up @@ -872,6 +864,12 @@ smgrGetPendingDeletes(bool forCommit, RelFileNodePendingDelete **ptr)
nrels = 0;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
Assert(pending->action);
if (!(pending->action->flags & PENDING_REL_DELETE_NEED_XLOG)) {
/* should not reocrd xlog expect pg relation */
continue;
}

if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
/*
* Cloudberry allows transactions that access temporary tables to be
Expand All @@ -890,6 +888,11 @@ smgrGetPendingDeletes(bool forCommit, RelFileNodePendingDelete **ptr)
*ptr = rptr;
for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
Assert(pending->action);
if (!(pending->action->flags & PENDING_REL_DELETE_NEED_XLOG)) {
continue;
}

if (pending->nestLevel >= nestLevel && pending->atCommit == forCommit
/*
* Keep this loop condition identical to above
Expand All @@ -903,6 +906,7 @@ smgrGetPendingDeletes(bool forCommit, RelFileNodePendingDelete **ptr)
}
return nrels;
}

/*
* PostPrepare_smgr -- Clean up after a successful PREPARE
*
Expand All @@ -921,7 +925,9 @@ PostPrepare_smgr(void)
next = pending->next;
pendingDeletes = next;
/* must explicitly free the list entry */
pfree(pending);
Assert(pending->action);
Assert(pending->action->destroy_pending_rel_delete);
pending->action->destroy_pending_rel_delete(pending);
}
}

Expand All @@ -936,8 +942,6 @@ AtSubCommit_smgr(void)
int nestLevel = GetCurrentTransactionNestLevel();
PendingRelDelete *pending;

UFileAtSubCommitSmgr();

for (pending = pendingDeletes; pending != NULL; pending = pending->next)
{
if (pending->nestLevel >= nestLevel)
Expand All @@ -955,7 +959,6 @@ AtSubCommit_smgr(void)
void
AtSubAbort_smgr(void)
{
UFileAtSubAbortSmgr();
smgrDoPendingDeletes(false);
}

Expand Down
Loading

0 comments on commit e545a96

Please sign in to comment.