Skip to content

Commit

Permalink
Use interface for storage interactions in Appendonly TAM
Browse files Browse the repository at this point in the history
fix test
  • Loading branch information
reshke committed Sep 30, 2024
1 parent 5f180c5 commit 5e1a0dd
Show file tree
Hide file tree
Showing 25 changed files with 237 additions and 63 deletions.
4 changes: 2 additions & 2 deletions src/backend/access/aocs/aocs_compaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ AOCSCompaction_DropSegmentFile(Relation aorel, int segno)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, pseudoSegNo, 0);
CloseAOSegmentFile(fd);
CloseAOSegmentFile(fd, aorel);
}
else
{
Expand Down Expand Up @@ -147,7 +147,7 @@ AOCSSegmentFileTruncateToEOF(Relation aorel, int segno, AOCSVPInfo *vpinfo)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof);
CloseAOSegmentFile(fd);
CloseAOSegmentFile(fd, aorel);

elogif(Debug_appendonly_print_compaction, LOG,
"Successfully truncated AO COL relation \"%s.%s\", relation id %u, relfilenode %lu column #%d, logical segment #%d (physical segment file #%d, logical EOF " INT64_FORMAT ")",
Expand Down
35 changes: 27 additions & 8 deletions src/backend/access/aocs/aocsam.c
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ aocs_delete_hook_type aocs_delete_hook = NULL;
*/
static void
open_datumstreamread_segfile(
char *basepath, RelFileNode node,
char *basepath,
const struct f_smgr_ao *smgrAO,
RelFileNode node,
AOCSFileSegInfo *segInfo,
DatumStreamRead *ds,
int colNo)
Expand Down Expand Up @@ -118,7 +120,9 @@ open_all_datumstreamread_segfiles(Relation rel,
{
AttrNumber attno = proj_atts[i];

open_datumstreamread_segfile(basepath, rel->rd_node, segInfo, ds[attno], attno);
RelationOpenSmgr(rel);

open_datumstreamread_segfile(basepath, rel->rd_smgr->smgr_ao, rel->rd_node, segInfo, ds[attno], attno);
datumstreamread_block(ds[attno], blockDirectory, attno);
}

Expand All @@ -139,6 +143,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc,
rnode.node = rel->rd_node;
rnode.backend = rel->rd_backend;

RelationOpenSmgr(rel);

/* open datum streams. It will open segment file underneath */
for (int i = 0; i < nvp; ++i)
{
Expand Down Expand Up @@ -179,7 +185,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc,
RelationGetRelationName(rel),
/* title */ titleBuf.data,
XLogIsNeeded() && RelationNeedsWAL(rel),
&rnode);
&rnode,
rel->rd_smgr->smgr_ao);

}
}
Expand Down Expand Up @@ -229,6 +236,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc,
for (AttrNumber attno = 0; attno < relationTupleDesc->natts; attno++)
ds[attno] = NULL;

RelationOpenSmgr(rel);

/* And then initialize the data streams for those columns we need */
for (AttrNumber i = 0; i < num_proj_atts; i++)
{
Expand Down Expand Up @@ -270,7 +279,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc,
attr,
RelationGetRelationName(rel),
/* title */ titleBuf.data,
&rel->rd_node);
&rel->rd_node,
rel->rd_smgr->smgr_ao);
}
}

Expand Down Expand Up @@ -1410,7 +1420,9 @@ openFetchSegmentFile(AOCSFetchDesc aocsFetchDesc,
if (logicalEof == 0)
return false;

open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_node,
RelationOpenSmgr(aocsFetchDesc->relation);

open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_smgr->smgr_ao, aocsFetchDesc->relation->rd_node,
fsInfo,
datumStreamFetchDesc->datumStream,
colNo);
Expand Down Expand Up @@ -1568,7 +1580,7 @@ aocs_fetch_init(Relation relation,
TupleDescAttr(tupleDesc, colno),
relation->rd_rel->relname.data,
/* title */ titleBuf.data,
&relation->rd_node);
&relation->rd_node, relation->rd_smgr->smgr_ao);

}
if (opts[colno])
Expand Down Expand Up @@ -1944,13 +1956,16 @@ aocs_begin_headerscan(Relation rel, int colno)
ao_attr.overflowSize = 0;
ao_attr.safeFSWriteSize = 0;
hdesc = palloc(sizeof(AOCSHeaderScanDescData));

RelationOpenSmgr(rel);

AppendOnlyStorageRead_Init(&hdesc->ao_read,
NULL, //current memory context
opts[colno]->blocksize,
RelationGetRelationName(rel),
"ALTER TABLE ADD COLUMN scan",
&ao_attr,
&rel->rd_node);
&rel->rd_node, rel->rd_smgr->smgr_ao);
hdesc->colno = colno;
return hdesc;
}
Expand Down Expand Up @@ -2035,6 +2050,9 @@ aocs_addcol_init(Relation rel,
NULL);

iattr = rel->rd_att->natts - num_newcols;

RelationOpenSmgr(rel);

for (i = 0; i < num_newcols; ++i, ++iattr)
{
Form_pg_attribute attr = TupleDescAttr(rel->rd_att, iattr);
Expand All @@ -2050,7 +2068,8 @@ aocs_addcol_init(Relation rel,
attr, RelationGetRelationName(rel),
titleBuf.data,
XLogIsNeeded() && RelationNeedsWAL(rel),
&rnode);
&rnode,
rel->rd_smgr->smgr_ao);
}
return desc;
}
Expand Down
2 changes: 1 addition & 1 deletion src/backend/access/aocs/aocsam_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -1365,7 +1365,7 @@ aoco_relation_copy_data(Relation rel, const RelFileNode *newrnode)
*/
RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO, rel);

copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend, rel->rd_rel->relpersistence);
copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel, rel->rd_backend, rel->rd_rel->relpersistence);

/*
* For append-optimized tables, no forks other than the main fork should
Expand Down
15 changes: 15 additions & 0 deletions src/backend/access/aocs/test/aocsam_test.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "postgres.h"
#include "utils/memutils.h"
#include "storage/smgr.h"

#include "../aocsam.c"

Expand All @@ -19,15 +20,23 @@ test__aocs_begin_headerscan(void **state)
{
AOCSHeaderScanDesc desc;
RelationData reldata;
SMgrRelationData smgrdata;
FormData_pg_class pgclass;

memset(&reldata, 0, sizeof(SMgrRelationData));

reldata.rd_rel = &pgclass;
reldata.rd_id = 12345;
StdRdOptions opt;

opt.blocksize = 8192 * 5;
StdRdOptions *opts[1];

smgrdata.smgr_ao = smgrAOGetDefault();
reldata.rd_smgr = &smgrdata;
reldata.rd_backend = InvalidBackendId;


opts[0] = &opt;

strncpy(&pgclass.relname.data[0], "mock_relation", 13);
Expand Down Expand Up @@ -63,6 +72,7 @@ test__aocs_addcol_init(void **state)
{
AOCSAddColumnDesc desc;
RelationData reldata;
SMgrRelationData smgrdata;
int nattr = 5;
StdRdOptions **opts =
(StdRdOptions **) malloc(sizeof(StdRdOptions *) * nattr);
Expand Down Expand Up @@ -98,6 +108,8 @@ test__aocs_addcol_init(void **state)
expect_value(create_datumstreamwrite, needsWAL, true);
expect_any(create_datumstreamwrite, rnode);
expect_any(create_datumstreamwrite, rnode);
expect_any(create_datumstreamwrite, smgrAO);
expect_any(create_datumstreamwrite, smgrAO);
expect_any_count(create_datumstreamwrite, attr, 2);
expect_any_count(create_datumstreamwrite, relname, 2);
expect_any_count(create_datumstreamwrite, title, 2);
Expand All @@ -112,6 +124,9 @@ test__aocs_addcol_init(void **state)
memset(reldata.rd_att->attrs, 0, sizeof(Form_pg_attribute *) * nattr);
reldata.rd_att->natts = nattr;

smgrdata.smgr_ao = smgrAOGetDefault();
reldata.rd_smgr = &smgrdata;

expect_value(GetAppendOnlyEntryAttributes, relid, 12345);
expect_any(GetAppendOnlyEntryAttributes, blocksize);
expect_any(GetAppendOnlyEntryAttributes, safefswritesize);
Expand Down
38 changes: 22 additions & 16 deletions src/backend/access/appendonly/aomd.c
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ OpenAOSegmentFile(Relation rel,
File fd;

errno = 0;
fd = PathNameOpenFile(filepathname, fileFlags);
fd = rel->rd_smgr->smgr_ao->smgr_AORelOpenSegFile(filepathname, fileFlags);
if (fd < 0)
{
if (logicalEof == 0 && errno == ENOENT)
Expand All @@ -168,9 +168,9 @@ OpenAOSegmentFile(Relation rel,
* Close an Append Only relation file segment
*/
void
CloseAOSegmentFile(File fd)
CloseAOSegmentFile(File fd, Relation rel)
{
FileClose(fd);
rel->rd_smgr->smgr_ao->smgr_FileClose(fd);
}

/*
Expand All @@ -188,7 +188,7 @@ TruncateAOSegmentFile(File fd, Relation rel, int32 segFileNum, int64 offset)
* Call the 'fd' module with a 64-bit length since AO segment files
* can be multi-gigabyte to the terabytes...
*/
if (FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
if (rel->rd_smgr->smgr_ao->smgr_FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0)
ereport(ERROR,
(errmsg("\"%s\": failed to truncate data after eof: %m",
relname)));
Expand Down Expand Up @@ -363,7 +363,8 @@ mdunlink_ao_perFile(const int segno, void *ctx)

static void
copy_file(char *srcsegpath, char *dstsegpath,
RelFileNode dst, int segfilenum, bool use_wal)
RelFileNode dst, SMgrRelation srcSMGR, SMgrRelation dstSMGR,
int segfilenum, bool use_wal)
{
File srcFile;
File dstFile;
Expand All @@ -372,7 +373,7 @@ copy_file(char *srcsegpath, char *dstsegpath,
char *buffer = palloc(BLCKSZ);
int dstflags;

srcFile = PathNameOpenFile(srcsegpath, O_RDONLY | PG_BINARY);
srcFile = srcSMGR->smgr_ao->smgr_AORelOpenSegFile(srcsegpath, O_RDONLY | PG_BINARY);
if (srcFile < 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand All @@ -387,13 +388,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
if (segfilenum)
dstflags |= O_CREAT;

dstFile = PathNameOpenFile(dstsegpath, dstflags);
dstFile = dstSMGR->smgr_ao->smgr_AORelOpenSegFile(dstsegpath, dstflags);
if (dstFile < 0)
ereport(ERROR,
(errcode_for_file_access(),
(errmsg("could not create destination file %s: %m", dstsegpath))));

left = FileDiskSize(srcFile);
left = srcSMGR->smgr_ao->smgr_FileDiskSize(srcFile);
if (left < 0)
ereport(ERROR,
(errcode_for_file_access(),
Expand All @@ -407,13 +408,13 @@ copy_file(char *srcsegpath, char *dstsegpath,
CHECK_FOR_INTERRUPTS();

len = Min(left, BLCKSZ);
if (FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len)
if (srcSMGR->smgr_ao->smgr_FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read %d bytes from file \"%s\": %m",
len, srcsegpath)));

if (FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len)
if (dstSMGR->smgr_ao->smgr_FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not write %d bytes to file \"%s\": %m",
Expand All @@ -425,19 +426,21 @@ copy_file(char *srcsegpath, char *dstsegpath,
left -= len;
}

if (FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
if (dstSMGR->smgr_ao->smgr_FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not fsync file \"%s\": %m",
dstsegpath)));
FileClose(srcFile);
FileClose(dstFile);
srcSMGR->smgr_ao->smgr_FileClose(srcFile);
dstSMGR->smgr_ao->smgr_FileClose(dstFile);
pfree(buffer);
}

struct copy_append_only_data_callback_ctx {
char *srcPath;
char *dstPath;
SMgrRelation srcSMGR;
SMgrRelation dstSMGR;
RelFileNode src;
RelFileNode dst;
bool useWal;
Expand All @@ -449,6 +452,7 @@ struct copy_append_only_data_callback_ctx {
*/
void
copy_append_only_data(RelFileNode src, RelFileNode dst,
SMgrRelation srcSMGR, SMgrRelation dstSMGR,
BackendId backendid, char relpersistence)
{
char *srcPath;
Expand All @@ -464,10 +468,12 @@ copy_append_only_data(RelFileNode src, RelFileNode dst,
srcPath = relpathbackend(src, backendid, MAIN_FORKNUM);
dstPath = relpathbackend(dst, backendid, MAIN_FORKNUM);

copy_file(srcPath, dstPath, dst, 0, useWal);
copy_file(srcPath, dstPath, dst, srcSMGR, dstSMGR, 0, useWal);

copyFiles.srcPath = srcPath;
copyFiles.dstPath = dstPath;
copyFiles.srcSMGR = srcSMGR;
copyFiles.dstSMGR = dstSMGR;
copyFiles.src = src;
copyFiles.dst = dst;
copyFiles.useWal = useWal;
Expand Down Expand Up @@ -502,7 +508,7 @@ copy_append_only_data_perFile(const int segno, void *ctx)
return false;
}
sprintf(dstSegPath, "%s.%u", copyFiles->dstPath, segno);
copy_file(srcSegPath, dstSegPath, copyFiles->dst, segno, copyFiles->useWal);
copy_file(srcSegPath, dstSegPath, copyFiles->dst, copyFiles->srcSMGR, copyFiles->dstSMGR, segno, copyFiles->useWal);

return true;
}
Expand Down Expand Up @@ -571,7 +577,7 @@ truncate_ao_perFile(const int segno, void *ctx)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, segno, 0);
CloseAOSegmentFile(fd);
CloseAOSegmentFile(fd, aorel);
}
else
{
Expand Down
4 changes: 2 additions & 2 deletions src/backend/access/appendonly/appendonly_compaction.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ AppendOnlyCompaction_DropSegmentFile(Relation aorel, int segno)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, 0);
CloseAOSegmentFile(fd);
CloseAOSegmentFile(fd, aorel);
}
else
{
Expand Down Expand Up @@ -256,7 +256,7 @@ AppendOnlySegmentFileTruncateToEOF(Relation aorel, int segno, int64 segeof)
if (fd >= 0)
{
TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof);
CloseAOSegmentFile(fd);
CloseAOSegmentFile(fd, aorel);

elogif(Debug_appendonly_print_compaction, LOG,
"Successfully truncated AO ROW relation \"%s.%s\", relation id %u, relfilenode %lu (physical segment file #%d, logical EOF " INT64_FORMAT ")",
Expand Down
Loading

0 comments on commit 5e1a0dd

Please sign in to comment.