diff --git a/src/backend/access/aocs/aocs_compaction.c b/src/backend/access/aocs/aocs_compaction.c index e80887af9ba..8c7e301f2d4 100644 --- a/src/backend/access/aocs/aocs_compaction.c +++ b/src/backend/access/aocs/aocs_compaction.c @@ -84,7 +84,7 @@ AOCSCompaction_DropSegmentFile(Relation aorel, int segno) if (fd >= 0) { TruncateAOSegmentFile(fd, aorel, pseudoSegNo, 0); - CloseAOSegmentFile(fd); + CloseAOSegmentFile(fd, aorel); } else { @@ -147,7 +147,7 @@ AOCSSegmentFileTruncateToEOF(Relation aorel, int segno, AOCSVPInfo *vpinfo) if (fd >= 0) { TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof); - CloseAOSegmentFile(fd); + CloseAOSegmentFile(fd, aorel); elogif(Debug_appendonly_print_compaction, LOG, "Successfully truncated AO COL relation \"%s.%s\", relation id %u, relfilenode %lu column #%d, logical segment #%d (physical segment file #%d, logical EOF " INT64_FORMAT ")", diff --git a/src/backend/access/aocs/aocsam.c b/src/backend/access/aocs/aocsam.c index 9f87ae55b11..f49a7333cb5 100644 --- a/src/backend/access/aocs/aocsam.c +++ b/src/backend/access/aocs/aocsam.c @@ -75,7 +75,9 @@ aocs_delete_hook_type aocs_delete_hook = NULL; */ static void open_datumstreamread_segfile( - char *basepath, RelFileNode node, + char *basepath, + const struct f_smgr_ao *smgrAO, + RelFileNode node, AOCSFileSegInfo *segInfo, DatumStreamRead *ds, int colNo) @@ -118,7 +120,9 @@ open_all_datumstreamread_segfiles(Relation rel, { AttrNumber attno = proj_atts[i]; - open_datumstreamread_segfile(basepath, rel->rd_node, segInfo, ds[attno], attno); + RelationOpenSmgr(rel); + + open_datumstreamread_segfile(basepath, rel->rd_smgr->smgr_ao, rel->rd_node, segInfo, ds[attno], attno); datumstreamread_block(ds[attno], blockDirectory, attno); } @@ -139,6 +143,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc, rnode.node = rel->rd_node; rnode.backend = rel->rd_backend; + RelationOpenSmgr(rel); + /* open datum streams. It will open segment file underneath */ for (int i = 0; i < nvp; ++i) { @@ -179,7 +185,8 @@ open_ds_write(Relation rel, DatumStreamWrite **ds, TupleDesc relationTupleDesc, RelationGetRelationName(rel), /* title */ titleBuf.data, XLogIsNeeded() && RelationNeedsWAL(rel), - &rnode); + &rnode, + rel->rd_smgr->smgr_ao); } } @@ -229,6 +236,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc, for (AttrNumber attno = 0; attno < relationTupleDesc->natts; attno++) ds[attno] = NULL; + RelationOpenSmgr(rel); + /* And then initialize the data streams for those columns we need */ for (AttrNumber i = 0; i < num_proj_atts; i++) { @@ -270,7 +279,8 @@ open_ds_read(Relation rel, DatumStreamRead **ds, TupleDesc relationTupleDesc, attr, RelationGetRelationName(rel), /* title */ titleBuf.data, - &rel->rd_node); + &rel->rd_node, + rel->rd_smgr->smgr_ao); } } @@ -1410,7 +1420,9 @@ openFetchSegmentFile(AOCSFetchDesc aocsFetchDesc, if (logicalEof == 0) return false; - open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_node, + RelationOpenSmgr(aocsFetchDesc->relation); + + open_datumstreamread_segfile(aocsFetchDesc->basepath, aocsFetchDesc->relation->rd_smgr->smgr_ao, aocsFetchDesc->relation->rd_node, fsInfo, datumStreamFetchDesc->datumStream, colNo); @@ -1568,7 +1580,7 @@ aocs_fetch_init(Relation relation, TupleDescAttr(tupleDesc, colno), relation->rd_rel->relname.data, /* title */ titleBuf.data, - &relation->rd_node); + &relation->rd_node, relation->rd_smgr->smgr_ao); } if (opts[colno]) @@ -1944,13 +1956,16 @@ aocs_begin_headerscan(Relation rel, int colno) ao_attr.overflowSize = 0; ao_attr.safeFSWriteSize = 0; hdesc = palloc(sizeof(AOCSHeaderScanDescData)); + + RelationOpenSmgr(rel); + AppendOnlyStorageRead_Init(&hdesc->ao_read, NULL, //current memory context opts[colno]->blocksize, RelationGetRelationName(rel), "ALTER TABLE ADD COLUMN scan", &ao_attr, - &rel->rd_node); + &rel->rd_node, rel->rd_smgr->smgr_ao); hdesc->colno = colno; return hdesc; } @@ -2035,6 +2050,9 @@ aocs_addcol_init(Relation rel, NULL); iattr = rel->rd_att->natts - num_newcols; + + RelationOpenSmgr(rel); + for (i = 0; i < num_newcols; ++i, ++iattr) { Form_pg_attribute attr = TupleDescAttr(rel->rd_att, iattr); @@ -2050,7 +2068,8 @@ aocs_addcol_init(Relation rel, attr, RelationGetRelationName(rel), titleBuf.data, XLogIsNeeded() && RelationNeedsWAL(rel), - &rnode); + &rnode, + rel->rd_smgr->smgr_ao); } return desc; } diff --git a/src/backend/access/aocs/aocsam_handler.c b/src/backend/access/aocs/aocsam_handler.c index 508473b70bf..8183b008281 100644 --- a/src/backend/access/aocs/aocsam_handler.c +++ b/src/backend/access/aocs/aocsam_handler.c @@ -1365,7 +1365,7 @@ aoco_relation_copy_data(Relation rel, const RelFileNode *newrnode) */ RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO, rel); - copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend, rel->rd_rel->relpersistence); + copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel, rel->rd_backend, rel->rd_rel->relpersistence); /* * For append-optimized tables, no forks other than the main fork should diff --git a/src/backend/access/aocs/test/aocsam_test.c b/src/backend/access/aocs/test/aocsam_test.c index b3c38cd8b77..0ba869f020b 100644 --- a/src/backend/access/aocs/test/aocsam_test.c +++ b/src/backend/access/aocs/test/aocsam_test.c @@ -5,6 +5,7 @@ #include "postgres.h" #include "utils/memutils.h" +#include "storage/smgr.h" #include "../aocsam.c" @@ -19,8 +20,11 @@ test__aocs_begin_headerscan(void **state) { AOCSHeaderScanDesc desc; RelationData reldata; + SMgrRelationData smgrdata; FormData_pg_class pgclass; + memset(&reldata, 0, sizeof(SMgrRelationData)); + reldata.rd_rel = &pgclass; reldata.rd_id = 12345; StdRdOptions opt; @@ -28,6 +32,11 @@ test__aocs_begin_headerscan(void **state) opt.blocksize = 8192 * 5; StdRdOptions *opts[1]; + smgrdata.smgr_ao = smgrAOGetDefault(); + reldata.rd_smgr = &smgrdata; + reldata.rd_backend = InvalidBackendId; + + opts[0] = &opt; strncpy(&pgclass.relname.data[0], "mock_relation", 13); @@ -63,6 +72,7 @@ test__aocs_addcol_init(void **state) { AOCSAddColumnDesc desc; RelationData reldata; + SMgrRelationData smgrdata; int nattr = 5; StdRdOptions **opts = (StdRdOptions **) malloc(sizeof(StdRdOptions *) * nattr); @@ -98,6 +108,8 @@ test__aocs_addcol_init(void **state) expect_value(create_datumstreamwrite, needsWAL, true); expect_any(create_datumstreamwrite, rnode); expect_any(create_datumstreamwrite, rnode); + expect_any(create_datumstreamwrite, smgrAO); + expect_any(create_datumstreamwrite, smgrAO); expect_any_count(create_datumstreamwrite, attr, 2); expect_any_count(create_datumstreamwrite, relname, 2); expect_any_count(create_datumstreamwrite, title, 2); @@ -112,6 +124,9 @@ test__aocs_addcol_init(void **state) memset(reldata.rd_att->attrs, 0, sizeof(Form_pg_attribute *) * nattr); reldata.rd_att->natts = nattr; + smgrdata.smgr_ao = smgrAOGetDefault(); + reldata.rd_smgr = &smgrdata; + expect_value(GetAppendOnlyEntryAttributes, relid, 12345); expect_any(GetAppendOnlyEntryAttributes, blocksize); expect_any(GetAppendOnlyEntryAttributes, safefswritesize); diff --git a/src/backend/access/appendonly/aomd.c b/src/backend/access/appendonly/aomd.c index c147562f7f9..1eb91ddf9cb 100644 --- a/src/backend/access/appendonly/aomd.c +++ b/src/backend/access/appendonly/aomd.c @@ -149,7 +149,7 @@ OpenAOSegmentFile(Relation rel, File fd; errno = 0; - fd = PathNameOpenFile(filepathname, fileFlags); + fd = rel->rd_smgr->smgr_ao->smgr_AORelOpenSegFile(filepathname, fileFlags); if (fd < 0) { if (logicalEof == 0 && errno == ENOENT) @@ -168,9 +168,9 @@ OpenAOSegmentFile(Relation rel, * Close an Append Only relation file segment */ void -CloseAOSegmentFile(File fd) +CloseAOSegmentFile(File fd, Relation rel) { - FileClose(fd); + rel->rd_smgr->smgr_ao->smgr_FileClose(fd); } /* @@ -188,7 +188,7 @@ TruncateAOSegmentFile(File fd, Relation rel, int32 segFileNum, int64 offset) * Call the 'fd' module with a 64-bit length since AO segment files * can be multi-gigabyte to the terabytes... */ - if (FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0) + if (rel->rd_smgr->smgr_ao->smgr_FileTruncate(fd, offset, WAIT_EVENT_DATA_FILE_TRUNCATE) != 0) ereport(ERROR, (errmsg("\"%s\": failed to truncate data after eof: %m", relname))); @@ -363,7 +363,8 @@ mdunlink_ao_perFile(const int segno, void *ctx) static void copy_file(char *srcsegpath, char *dstsegpath, - RelFileNode dst, int segfilenum, bool use_wal) + RelFileNode dst, SMgrRelation srcSMGR, SMgrRelation dstSMGR, + int segfilenum, bool use_wal) { File srcFile; File dstFile; @@ -372,7 +373,7 @@ copy_file(char *srcsegpath, char *dstsegpath, char *buffer = palloc(BLCKSZ); int dstflags; - srcFile = PathNameOpenFile(srcsegpath, O_RDONLY | PG_BINARY); + srcFile = srcSMGR->smgr_ao->smgr_AORelOpenSegFile(srcsegpath, O_RDONLY | PG_BINARY); if (srcFile < 0) ereport(ERROR, (errcode_for_file_access(), @@ -387,13 +388,13 @@ copy_file(char *srcsegpath, char *dstsegpath, if (segfilenum) dstflags |= O_CREAT; - dstFile = PathNameOpenFile(dstsegpath, dstflags); + dstFile = dstSMGR->smgr_ao->smgr_AORelOpenSegFile(dstsegpath, dstflags); if (dstFile < 0) ereport(ERROR, (errcode_for_file_access(), (errmsg("could not create destination file %s: %m", dstsegpath)))); - left = FileDiskSize(srcFile); + left = srcSMGR->smgr_ao->smgr_FileDiskSize(srcFile); if (left < 0) ereport(ERROR, (errcode_for_file_access(), @@ -407,13 +408,13 @@ copy_file(char *srcsegpath, char *dstsegpath, CHECK_FOR_INTERRUPTS(); len = Min(left, BLCKSZ); - if (FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len) + if (srcSMGR->smgr_ao->smgr_FileRead(srcFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_READ) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not read %d bytes from file \"%s\": %m", len, srcsegpath))); - if (FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len) + if (dstSMGR->smgr_ao->smgr_FileWrite(dstFile, buffer, len, offset, WAIT_EVENT_DATA_FILE_WRITE) != len) ereport(ERROR, (errcode_for_file_access(), errmsg("could not write %d bytes to file \"%s\": %m", @@ -425,19 +426,21 @@ copy_file(char *srcsegpath, char *dstsegpath, left -= len; } - if (FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0) + if (dstSMGR->smgr_ao->smgr_FileSync(dstFile, WAIT_EVENT_DATA_FILE_IMMEDIATE_SYNC) != 0) ereport(ERROR, (errcode_for_file_access(), errmsg("could not fsync file \"%s\": %m", dstsegpath))); - FileClose(srcFile); - FileClose(dstFile); + srcSMGR->smgr_ao->smgr_FileClose(srcFile); + dstSMGR->smgr_ao->smgr_FileClose(dstFile); pfree(buffer); } struct copy_append_only_data_callback_ctx { char *srcPath; char *dstPath; + SMgrRelation srcSMGR; + SMgrRelation dstSMGR; RelFileNode src; RelFileNode dst; bool useWal; @@ -449,6 +452,7 @@ struct copy_append_only_data_callback_ctx { */ void copy_append_only_data(RelFileNode src, RelFileNode dst, + SMgrRelation srcSMGR, SMgrRelation dstSMGR, BackendId backendid, char relpersistence) { char *srcPath; @@ -464,10 +468,12 @@ copy_append_only_data(RelFileNode src, RelFileNode dst, srcPath = relpathbackend(src, backendid, MAIN_FORKNUM); dstPath = relpathbackend(dst, backendid, MAIN_FORKNUM); - copy_file(srcPath, dstPath, dst, 0, useWal); + copy_file(srcPath, dstPath, dst, srcSMGR, dstSMGR, 0, useWal); copyFiles.srcPath = srcPath; copyFiles.dstPath = dstPath; + copyFiles.srcSMGR = srcSMGR; + copyFiles.dstSMGR = dstSMGR; copyFiles.src = src; copyFiles.dst = dst; copyFiles.useWal = useWal; @@ -502,7 +508,7 @@ copy_append_only_data_perFile(const int segno, void *ctx) return false; } sprintf(dstSegPath, "%s.%u", copyFiles->dstPath, segno); - copy_file(srcSegPath, dstSegPath, copyFiles->dst, segno, copyFiles->useWal); + copy_file(srcSegPath, dstSegPath, copyFiles->dst, copyFiles->srcSMGR, copyFiles->dstSMGR, segno, copyFiles->useWal); return true; } @@ -571,7 +577,7 @@ truncate_ao_perFile(const int segno, void *ctx) if (fd >= 0) { TruncateAOSegmentFile(fd, aorel, segno, 0); - CloseAOSegmentFile(fd); + CloseAOSegmentFile(fd, aorel); } else { diff --git a/src/backend/access/appendonly/appendonly_compaction.c b/src/backend/access/appendonly/appendonly_compaction.c index 8fe266a38ea..7011af8db62 100644 --- a/src/backend/access/appendonly/appendonly_compaction.c +++ b/src/backend/access/appendonly/appendonly_compaction.c @@ -91,7 +91,7 @@ AppendOnlyCompaction_DropSegmentFile(Relation aorel, int segno) if (fd >= 0) { TruncateAOSegmentFile(fd, aorel, fileSegNo, 0); - CloseAOSegmentFile(fd); + CloseAOSegmentFile(fd, aorel); } else { @@ -256,7 +256,7 @@ AppendOnlySegmentFileTruncateToEOF(Relation aorel, int segno, int64 segeof) if (fd >= 0) { TruncateAOSegmentFile(fd, aorel, fileSegNo, segeof); - CloseAOSegmentFile(fd); + CloseAOSegmentFile(fd, aorel); elogif(Debug_appendonly_print_compaction, LOG, "Successfully truncated AO ROW relation \"%s.%s\", relation id %u, relfilenode %lu (physical segment file #%d, logical EOF " INT64_FORMAT ")", diff --git a/src/backend/access/appendonly/appendonlyam.c b/src/backend/access/appendonly/appendonlyam.c index 32a306f3ae0..c9f1a0dd63c 100755 --- a/src/backend/access/appendonly/appendonlyam.c +++ b/src/backend/access/appendonly/appendonlyam.c @@ -165,6 +165,7 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan) if (!scan->initedStorageRoutines) { PGFunction *fns = NULL; + RelationOpenSmgr(reln); AppendOnlyStorageRead_Init( &scan->storageRead, @@ -173,7 +174,7 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan) NameStr(scan->aos_rd->rd_rel->relname), scan->title, &scan->storageAttributes, - &scan->aos_rd->rd_node); + &scan->aos_rd->rd_node, reln->rd_smgr->smgr_ao); /* * There is no guarantee that the current memory context will be @@ -287,6 +288,8 @@ SetNextFileSegForRead(AppendOnlyScanDesc scan) Assert(scan->initedStorageRoutines); + + RelationOpenSmgr(reln); AppendOnlyStorageRead_OpenFile( &scan->storageRead, scan->aos_filenamepath, @@ -2219,6 +2222,8 @@ appendonly_fetch_init(Relation relation, aoFetchDesc->lastSequence[segno] = ReadLastSequence(aoFormData.segrelid, segno); } + RelationOpenSmgr(relation); + AppendOnlyStorageRead_Init( &aoFetchDesc->storageRead, aoFetchDesc->initContext, @@ -2226,7 +2231,7 @@ appendonly_fetch_init(Relation relation, NameStr(aoFetchDesc->relation->rd_rel->relname), aoFetchDesc->title, &aoFetchDesc->storageAttributes, - &relation->rd_node); + &relation->rd_node, relation->rd_smgr->smgr_ao); fns = get_funcs_for_compression(NameStr(aoFormData.compresstype)); @@ -2758,6 +2763,8 @@ appendonly_insert_init(Relation rel, int segno) RelationGetRelationName(aoInsertDesc->aoi_rel)); aoInsertDesc->title = titleBuf.data; + RelationOpenSmgr(rel); + AppendOnlyStorageWrite_Init( &aoInsertDesc->storageWrite, NULL, @@ -2765,7 +2772,7 @@ appendonly_insert_init(Relation rel, int segno) RelationGetRelationName(aoInsertDesc->aoi_rel), aoInsertDesc->title, &aoInsertDesc->storageAttributes, - XLogIsNeeded() && RelationNeedsWAL(aoInsertDesc->aoi_rel)); + XLogIsNeeded() && RelationNeedsWAL(aoInsertDesc->aoi_rel), rel->rd_smgr->smgr_ao); aoInsertDesc->storageWrite.compression_functions = fns; aoInsertDesc->storageWrite.compressionState = cs; diff --git a/src/backend/access/appendonly/appendonlyam_handler.c b/src/backend/access/appendonly/appendonlyam_handler.c index 4478e2958c7..bb840f106ce 100644 --- a/src/backend/access/appendonly/appendonlyam_handler.c +++ b/src/backend/access/appendonly/appendonlyam_handler.c @@ -1231,7 +1231,7 @@ appendonly_relation_copy_data(Relation rel, const RelFileNode *newrnode) */ RelationCreateStorage(*newrnode, rel->rd_rel->relpersistence, SMGR_AO, rel); - copy_append_only_data(rel->rd_node, *newrnode, rel->rd_backend, rel->rd_rel->relpersistence); + copy_append_only_data(rel->rd_node, *newrnode, rel->rd_smgr, dstrel, rel->rd_backend, rel->rd_rel->relpersistence); /* * For append-optimized tables, no forks other than the main fork should diff --git a/src/backend/cdb/cdbappendonlystorageread.c b/src/backend/cdb/cdbappendonlystorageread.c index 76f9b4c8f59..acf368eccf7 100755 --- a/src/backend/cdb/cdbappendonlystorageread.c +++ b/src/backend/cdb/cdbappendonlystorageread.c @@ -63,7 +63,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead, char *relationName, char *title, AppendOnlyStorageAttributes *storageAttributes, - RelFileNode *relFileNode) + RelFileNode *relFileNode, + const struct f_smgr_ao *smgrAO) { uint8 *memory; int32 memoryLen; @@ -117,7 +118,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead, storageRead->maxBufferLen, storageRead->largeReadLen, relationName, - relFileNode); + relFileNode, + smgrAO); elogif(Debug_appendonly_print_scan || Debug_appendonly_print_read_block, LOG, "Append-Only Storage Read initialize for table '%s' " @@ -135,6 +137,8 @@ AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead, MemoryContextSwitchTo(oldMemoryContext); storageRead->isActive = true; + + storageRead->smgrAO = smgrAO; } /* @@ -244,7 +248,7 @@ AppendOnlyStorageRead_DoOpenFile(AppendOnlyStorageRead *storageRead, /* * Open the file for read. */ - file = PathNameOpenFile(filePathName, fileFlags); + file = storageRead->smgrAO->smgr_AORelOpenSegFile(filePathName, fileFlags); return file; } diff --git a/src/backend/cdb/cdbappendonlystoragewrite.c b/src/backend/cdb/cdbappendonlystoragewrite.c index 16e5569378d..f2df686f17d 100755 --- a/src/backend/cdb/cdbappendonlystoragewrite.c +++ b/src/backend/cdb/cdbappendonlystoragewrite.c @@ -69,7 +69,8 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite, char *relationName, char *title, AppendOnlyStorageAttributes *storageAttributes, - bool needsWAL) + bool needsWAL, + const struct f_smgr_ao *smgrAO) { uint8 *memory; int32 memoryLen; @@ -82,10 +83,14 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite, Assert(relationName != NULL); Assert(storageAttributes != NULL); + Assert(smgrAO != NULL); + /* UNDONE: Range check fields in storageAttributes */ MemSet(storageWrite, 0, sizeof(AppendOnlyStorageWrite)); + storageWrite->smgrAO = smgrAO; + storageWrite->maxBufferLen = maxBufferLen; if (memoryContext == NULL) @@ -147,7 +152,8 @@ AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite, memoryLen, storageWrite->maxBufferWithCompressionOverrrunLen, storageWrite->maxLargeWriteLen, - relationName); + relationName, + smgrAO); elogif(Debug_appendonly_print_insert || Debug_appendonly_print_append_block, LOG, "Append-Only Storage Write initialize for table '%s' (compression = %s, compression level %d, maximum buffer length %d, large write length %d)", @@ -319,7 +325,7 @@ AppendOnlyStorageWrite_OpenFile(AppendOnlyStorageWrite *storageWrite, errno = 0; int fileFlags = O_RDWR | PG_BINARY; - file = PathNameOpenFile(path, fileFlags); + file = storageWrite->smgrAO->smgr_AORelOpenSegFile(path, fileFlags); if (file < 0) ereport(ERROR, (errcode_for_file_access(), diff --git a/src/backend/cdb/cdbappendonlyxlog.c b/src/backend/cdb/cdbappendonlyxlog.c index 23cec513ac1..68cea8e3fa2 100644 --- a/src/backend/cdb/cdbappendonlyxlog.c +++ b/src/backend/cdb/cdbappendonlyxlog.c @@ -67,6 +67,13 @@ ao_insert_replay(XLogReaderState *record) xl_ao_insert *xlrec = (xl_ao_insert *) XLogRecGetData(record); char *buffer = (char *) xlrec + SizeOfAOInsert; uint32 len = XLogRecGetDataLen(record) - SizeOfAOInsert; + SMgrRelation smgr; + + /* + * Open the relation at smgr level. Relations using shared buffers need + * the default SMGR implementation. + */ + smgr = smgropen(xlrec->target.node, InvalidBackendId, SMGR_AO, NULL); dbPath = GetDatabasePath(xlrec->target.node.dbNode, xlrec->target.node.spcNode); @@ -82,14 +89,14 @@ ao_insert_replay(XLogReaderState *record) /* When writing from the beginning of the file, it might not exist yet. Create it. */ if (xlrec->target.offset == 0) fileFlags |= O_CREAT; - file = PathNameOpenFile(path, fileFlags); + file = smgr->smgr_ao->smgr_AORelOpenSegFile(path, fileFlags); if (file < 0) { XLogAOSegmentFile(xlrec->target.node, xlrec->target.segment_filenum); return; } - written_len = FileWrite(file, buffer, len, xlrec->target.offset, + written_len = smgr->smgr_ao->smgr_FileWrite(file, buffer, len, xlrec->target.offset, WAIT_EVENT_COPY_FILE_WRITE); if (written_len < 0 || written_len != len) { @@ -104,7 +111,7 @@ ao_insert_replay(XLogReaderState *record) xlrec->target.segment_filenum, file); - FileClose(file); + smgr->smgr_ao->smgr_FileClose(file); } /* @@ -130,9 +137,17 @@ ao_truncate_replay(XLogReaderState *record) char *dbPath; char path[MAXPGPATH]; File file; + SMgrRelation smgr; xl_ao_truncate *xlrec = (xl_ao_truncate*) XLogRecGetData(record); + /* + * Open the relation at smgr level. Relations using shared buffers need + * the default SMGR implementation. + */ + smgr = smgropen(xlrec->target.node, InvalidBackendId, SMGR_AO, NULL); + + dbPath = GetDatabasePath(xlrec->target.node.dbNode, xlrec->target.node.spcNode); @@ -143,7 +158,7 @@ ao_truncate_replay(XLogReaderState *record) pfree(dbPath); dbPath = NULL; - file = PathNameOpenFile(path, O_RDWR | PG_BINARY); + file = smgr->smgr_ao->smgr_AORelOpenSegFile(path, O_RDWR | PG_BINARY); if (file < 0) { /* diff --git a/src/backend/cdb/cdbbufferedappend.c b/src/backend/cdb/cdbbufferedappend.c index 96a5e7edf41..dac215be58d 100644 --- a/src/backend/cdb/cdbbufferedappend.c +++ b/src/backend/cdb/cdbbufferedappend.c @@ -58,7 +58,8 @@ BufferedAppendInit(BufferedAppend *bufferedAppend, int32 memoryLen, int32 maxBufferWithCompressionOverrrunLen, int32 maxLargeWriteLen, - char *relationName) + char *relationName, + const struct f_smgr_ao *smgrAO) { Assert(bufferedAppend != NULL); Assert(memory != NULL); @@ -100,6 +101,8 @@ BufferedAppendInit(BufferedAppend *bufferedAppend, bufferedAppend->file = -1; bufferedAppend->filePathName = NULL; bufferedAppend->fileLen = 0; + + bufferedAppend->smgrAO = smgrAO; } /* @@ -158,7 +161,7 @@ BufferedAppendWrite(BufferedAppend *bufferedAppend, bool needsWAL) { int32 byteswritten; - byteswritten = FileWrite(bufferedAppend->file, + byteswritten = bufferedAppend->smgrAO->smgr_FileWrite(bufferedAppend->file, (char *) largeWriteMemory + bytestotal, bytesleft, bufferedAppend->largeWritePosition + bytestotal, diff --git a/src/backend/cdb/cdbbufferedread.c b/src/backend/cdb/cdbbufferedread.c index 00126630077..0232490db55 100644 --- a/src/backend/cdb/cdbbufferedread.c +++ b/src/backend/cdb/cdbbufferedread.c @@ -55,14 +55,14 @@ BufferedReadMemoryLen( * determine the amount of memory to supply. */ void -BufferedReadInit( - BufferedRead *bufferedRead, +BufferedReadInit(BufferedRead *bufferedRead, uint8 *memory, int32 memoryLen, int32 maxBufferLen, int32 maxLargeReadLen, char *relationName, - RelFileNode *file_node) + RelFileNode *file_node, + const struct f_smgr_ao *smgr) { Assert(bufferedRead != NULL); Assert(memory != NULL); @@ -113,6 +113,8 @@ BufferedReadInit( */ bufferedRead->haveTemporaryLimitInEffect = false; bufferedRead->temporaryLimitFileLen = 0; + + bufferedRead->smgrAO = smgr; } /* @@ -178,7 +180,7 @@ BufferedReadIo( offset = 0; while (largeReadLen > 0) { - int actualLen = FileRead(bufferedRead->file, + int actualLen = bufferedRead->smgrAO->smgr_FileRead(bufferedRead->file, (char *) largeReadMemory, largeReadLen, bufferedRead->fileOff, diff --git a/src/backend/cdb/test/cdbbufferedread_test.c b/src/backend/cdb/test/cdbbufferedread_test.c index 75493ed3622..de43ef4a115 100644 --- a/src/backend/cdb/test/cdbbufferedread_test.c +++ b/src/backend/cdb/test/cdbbufferedread_test.c @@ -17,12 +17,13 @@ test__BufferedReadInit__IsConsistent(void **state) int32 maxBufferLen = 128; int32 maxLargeReadLen = 128; RelFileNode file_node = {0}; + const struct f_smgr_ao *smgrAO = smgrAOGetDefault(); memset(bufferedRead, 0 , sizeof(BufferedRead)); /* * Call the function so as to set the above values. */ - BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen, maxLargeReadLen, relname, &file_node); + BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen, maxLargeReadLen, relname, &file_node, smgrAO); /* * Check for consistency */ @@ -47,12 +48,13 @@ test__BufferedReadUseBeforeBuffer__IsNextReadLenZero(void **state) int32 nextBufferLen; int32 maxReadAheadLen = 64; RelFileNode file_node = {0}; + const struct f_smgr_ao *smgrAO = smgrAOGetDefault(); memset(bufferedRead, 0 , sizeof(BufferedRead)); /* * Initialize the buffer */ - BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen, maxLargeReadLen, relname, &file_node); + BufferedReadInit(bufferedRead, memory, memoryLen, maxBufferLen, relname, maxLargeReadLen, &file_node, smgrAO); /* * filling up the bufferedRead struct */ diff --git a/src/backend/storage/smgr/smgr.c b/src/backend/storage/smgr/smgr.c index 11cfbf6e777..1bbe884bc3a 100644 --- a/src/backend/storage/smgr/smgr.c +++ b/src/backend/storage/smgr/smgr.c @@ -100,6 +100,20 @@ static const f_smgr smgrsw[] = { } }; +static const f_smgr_ao smgrswao[] = { + /* regular file */ + { + .smgr_FileClose = FileClose, + .smgr_FileDiskSize = FileDiskSize, + .smgr_FileTruncate = FileTruncate, + .smgr_AORelOpenSegFile = PathNameOpenFile, + .smgr_FileWrite = FileWrite, + .smgr_FileRead = FileRead, + .smgr_FileSync = FileSync, + }, +}; + + static const int NSmgr = lengthof(smgrsw); /* @@ -155,6 +169,11 @@ smgrshutdown(int code, Datum arg) } } +const struct f_smgr_ao * +smgrAOGetDefault(void) { + return &smgrswao[0]; +} + /* * smgropen() -- Return an SMgrRelation object, creating it if need be. * @@ -204,6 +223,8 @@ smgropen(RelFileNode rnode, BackendId backend, SMgrImpl which, Relation rel) dlist_push_tail(&unowned_relns, &reln->node); reln->smgr = &smgrsw[reln->smgr_which]; + reln->smgr_ao = &smgrswao[0]; + /* * hook for other storage managers. */ @@ -211,6 +232,7 @@ smgropen(RelFileNode rnode, BackendId backend, SMgrImpl which, Relation rel) (*smgr_hook) (reln, backend, which, rel); Assert(reln->smgr); + Assert(reln->smgr_ao); (*reln->smgr).smgr_open(reln); } diff --git a/src/backend/utils/datumstream/datumstream.c b/src/backend/utils/datumstream/datumstream.c index 5588d978208..2fd3ab0775a 100644 --- a/src/backend/utils/datumstream/datumstream.c +++ b/src/backend/utils/datumstream/datumstream.c @@ -500,7 +500,8 @@ create_datumstreamwrite( char *relname, char *title, bool needsWAL, - RelFileNodeBackend *rnode) + RelFileNodeBackend *rnode, + const struct f_smgr_ao *smgrAO) { DatumStreamWrite *acc = palloc0(sizeof(DatumStreamWrite)); @@ -569,7 +570,8 @@ create_datumstreamwrite( relname, title, &acc->ao_attr, - needsWAL); + needsWAL, + smgrAO); acc->ao_write.compression_functions = compressionFunctions; acc->ao_write.compressionState = compressionState; @@ -645,7 +647,7 @@ create_datumstreamread( Form_pg_attribute attr, char *relname, char *title, - RelFileNode *relFileNode) + RelFileNode *relFileNode, const struct f_smgr_ao *smgrAO) { DatumStreamRead *acc = palloc0(sizeof(DatumStreamRead)); @@ -702,7 +704,8 @@ create_datumstreamread( relname, title, &acc->ao_attr, - relFileNode); + relFileNode, + smgrAO); acc->ao_read.compression_functions = compressionFunctions; acc->ao_read.compressionState = compressionState; diff --git a/src/include/access/aomd.h b/src/include/access/aomd.h index 717cb70f79a..6390d42c0ae 100644 --- a/src/include/access/aomd.h +++ b/src/include/access/aomd.h @@ -17,6 +17,7 @@ #include "htup_details.h" #include "storage/fd.h" +#include "storage/smgr.h" #include "utils/rel.h" extern int AOSegmentFilePathNameLen(Relation rel); @@ -39,7 +40,7 @@ extern File OpenAOSegmentFile(Relation rel, char *filepathname, int64 logicalEof); -extern void CloseAOSegmentFile(File fd); +extern void CloseAOSegmentFile(File fd, Relation rel); extern void TruncateAOSegmentFile(File fd, @@ -53,7 +54,8 @@ extern void mdunlink_ao(RelFileNodeBackend rnode, ForkNumber forkNumber, bool isRedo); extern void -copy_append_only_data(RelFileNode src, RelFileNode dst, BackendId backendid, char relpersistence); +copy_append_only_data(RelFileNode src, RelFileNode dst, + SMgrRelation srcSMGR, SMgrRelation dstSMGR, BackendId backendid, char relpersistence); /* * return value should be true if the callback was able to find the given diff --git a/src/include/cdb/cdbappendonlystorageread.h b/src/include/cdb/cdbappendonlystorageread.h index 0f2cccfea31..879d054e524 100755 --- a/src/include/cdb/cdbappendonlystorageread.h +++ b/src/include/cdb/cdbappendonlystorageread.h @@ -16,6 +16,7 @@ #include "catalog/pg_appendonly.h" #include "catalog/pg_compression.h" +#include "storage/smgr.h" #include "cdb/cdbappendonlystorage.h" #include "cdb/cdbappendonlystoragelayer.h" #include "cdb/cdbbufferedread.h" @@ -191,6 +192,7 @@ typedef struct AppendOnlyStorageRead * pointers. The array index * corresponds to COMP_FUNC_* */ + const struct f_smgr_ao *smgrAO; } AppendOnlyStorageRead; extern void AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead, @@ -198,7 +200,7 @@ extern void AppendOnlyStorageRead_Init(AppendOnlyStorageRead *storageRead, int32 maxBufferLen, char *relationName, char *title, AppendOnlyStorageAttributes *storageAttributes, - RelFileNode *relFileNode); + RelFileNode *relFileNode, const struct f_smgr_ao *smgrAO); extern char *AppendOnlyStorageRead_RelationName(AppendOnlyStorageRead *storageRead); extern char *AppendOnlyStorageRead_SegmentFileName(AppendOnlyStorageRead *storageRead); diff --git a/src/include/cdb/cdbappendonlystoragewrite.h b/src/include/cdb/cdbappendonlystoragewrite.h index acbdfe0211b..d7493f6103e 100755 --- a/src/include/cdb/cdbappendonlystoragewrite.h +++ b/src/include/cdb/cdbappendonlystoragewrite.h @@ -176,6 +176,8 @@ typedef struct AppendOnlyStorageWrite bool needsWAL; + const struct f_smgr_ao *smgrAO; + } AppendOnlyStorageWrite; extern void AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite, @@ -184,7 +186,9 @@ extern void AppendOnlyStorageWrite_Init(AppendOnlyStorageWrite *storageWrite, char *relationName, char *title, AppendOnlyStorageAttributes *storageAttributes, - bool needsWAL); + bool needsWAL, + const struct f_smgr_ao *smgrAO); + extern void AppendOnlyStorageWrite_FinishSession(AppendOnlyStorageWrite *storageWrite); extern void AppendOnlyStorageWrite_TransactionCreateFile(AppendOnlyStorageWrite *storageWrite, diff --git a/src/include/cdb/cdbbufferedappend.h b/src/include/cdb/cdbbufferedappend.h index 985632a096d..27950032c98 100644 --- a/src/include/cdb/cdbbufferedappend.h +++ b/src/include/cdb/cdbbufferedappend.h @@ -79,6 +79,7 @@ typedef struct BufferedAppend int64 fileLen; int64 fileLen_uncompressed; /* for calculating compress ratio */ + const struct f_smgr_ao *smgrAO; } BufferedAppend; /* @@ -102,7 +103,8 @@ extern void BufferedAppendInit( int32 memoryLen, int32 maxBufferWithCompressionOverrrunLen, int32 maxLargeWriteLen, - char *relationName); + char *relationName, + const struct f_smgr_ao *smgrAO); /* * Takes an open file handle for the next file. diff --git a/src/include/cdb/cdbbufferedread.h b/src/include/cdb/cdbbufferedread.h index 7e176698380..d4824aed406 100644 --- a/src/include/cdb/cdbbufferedread.h +++ b/src/include/cdb/cdbbufferedread.h @@ -80,6 +80,7 @@ typedef struct BufferedRead bool haveTemporaryLimitInEffect; int64 temporaryLimitFileLen; + const struct f_smgr_ao *smgrAO; } BufferedRead; /* @@ -104,7 +105,8 @@ extern void BufferedReadInit( int32 maxBufferLen, int32 maxLargeReadLen, char *relationName, - RelFileNode *file_node); + RelFileNode *file_node, + const struct f_smgr_ao *smgr); /* * Takes an open file handle for the next file. diff --git a/src/include/storage/smgr.h b/src/include/storage/smgr.h index 3751900cd05..10370ef5262 100644 --- a/src/include/storage/smgr.h +++ b/src/include/storage/smgr.h @@ -21,6 +21,7 @@ #include "storage/block.h" #include "storage/relfilenode.h" #include "storage/dbdirnode.h" +#include "storage/fd.h" #include "utils/relcache.h" typedef enum SMgrImplementation @@ -73,6 +74,8 @@ typedef struct SMgrRelationData char smgr_relpersistence; /* pointer to storage manager */ const struct f_smgr *smgr; + /*pointer to AO storage manager */ + const struct f_smgr_ao *smgr_ao; /* * Fields below here are intended to be private to smgr.c and its @@ -127,12 +130,25 @@ typedef struct f_smgr void (*smgr_immedsync) (SMgrRelation reln, ForkNumber forknum); } f_smgr; +typedef struct f_smgr_ao { + off_t (*smgr_FileDiskSize) (File file); + void (*smgr_FileClose) (File file); + int (*smgr_FileTruncate) (File file, int64 offset, uint32 wait_event_info); + File (*smgr_AORelOpenSegFile) (const char *filePath, int fileFlags); + int (*smgr_FileWrite) (File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); + int (*smgr_FileRead) (File file, char *buffer, int amount, off_t offset, uint32 wait_event_info); + int (*smgr_FileSync) (File file, uint32 wait_event_info); +} f_smgr_ao; + + typedef void (*smgr_init_hook_type) (void); typedef void (*smgr_hook_type) (SMgrRelation reln, BackendId backend, SMgrImpl which, Relation rel); typedef void (*smgr_shutdown_hook_type) (void); +typedef void (*smgrao_hook_type)(SMgrRelation reln, BackendId backend, SMgrImpl which, Relation rel); extern PGDLLIMPORT smgr_init_hook_type smgr_init_hook; extern PGDLLIMPORT smgr_hook_type smgr_hook; extern PGDLLIMPORT smgr_shutdown_hook_type smgr_shutdown_hook; +extern PGDLLIMPORT smgrao_hook_type smgrao_hook; extern bool smgr_is_heap_relation(SMgrRelation reln); @@ -166,6 +182,8 @@ extern void smgrtruncate(SMgrRelation reln, ForkNumber *forknum, extern void smgrimmedsync(SMgrRelation reln, ForkNumber forknum); extern void AtEOXact_SMgr(void); +extern const struct f_smgr_ao * smgrAOGetDefault(void); + /* * Hook for plugins to collect statistics from storage functions diff --git a/src/include/utils/datumstream.h b/src/include/utils/datumstream.h index 5c98fc4f93b..99590d83f76 100644 --- a/src/include/utils/datumstream.h +++ b/src/include/utils/datumstream.h @@ -263,7 +263,8 @@ extern DatumStreamWrite *create_datumstreamwrite( char *relname, char *title, bool needsWAL, - RelFileNodeBackend *rnode); + RelFileNodeBackend *rnode, + const struct f_smgr_ao *smgrAO); extern DatumStreamRead *create_datumstreamread( char *compName, @@ -274,7 +275,7 @@ extern DatumStreamRead *create_datumstreamread( Form_pg_attribute attr, char *relname, char *title, - RelFileNode *relFileNode); + RelFileNode *relFileNode, const struct f_smgr_ao *smgrAO); extern void datumstreamwrite_open_file( DatumStreamWrite * ds, diff --git a/src/test/regress/expected/bfv_copy.out b/src/test/regress/expected/bfv_copy.out new file mode 100644 index 00000000000..cc589487128 --- /dev/null +++ b/src/test/regress/expected/bfv_copy.out @@ -0,0 +1,26 @@ +CREATE TABLE copy_converse_varify_error(a int, b text); +COPY copy_converse_varify_error FROM '/home/reshke/cbdb/src/test/regress/data/copy_converse_varify_error.data' +WITH(FORMAT text, delimiter '|', "null" E'\\N', newline 'LF', escape 'OFF') +LOG ERRORS SEGMENT REJECT LIMIT 10 ROWS; +NOTICE: found 5 data formatting errors (5 or more input rows), rejected related input data +SELECT * FROM copy_converse_varify_error; + a | b +---+------------- + 1 | + 4 | aabbccdd + 7 | 123aabbccdd +(3 rows) + +DROP TABLE copy_converse_varify_error; +CREATE TABLE copy_eol_on_nextrawpage(b text); +COPY copy_eol_on_nextrawpage FROM '/home/reshke/cbdb/src/test/regress/data/eol_on_next_raw_page.data' +WITH(FORMAT text, delimiter '|', "null" E'\\N', newline 'LF', escape 'OFF') +LOG ERRORS SEGMENT REJECT LIMIT 10 ROWS; +NOTICE: found 1 data formatting errors (1 or more input rows), rejected related input data +SELECT count(*) FROM copy_eol_on_nextrawpage; + count +------- + 15 +(1 row) + +DROP TABLE copy_eol_on_nextrawpage; diff --git a/src/test/regress/sql/bfv_copy.sql b/src/test/regress/sql/bfv_copy.sql new file mode 100644 index 00000000000..9a1331b56aa --- /dev/null +++ b/src/test/regress/sql/bfv_copy.sql @@ -0,0 +1,13 @@ +CREATE TABLE copy_converse_varify_error(a int, b text); +COPY copy_converse_varify_error FROM '/home/reshke/cbdb/src/test/regress/data/copy_converse_varify_error.data' +WITH(FORMAT text, delimiter '|', "null" E'\\N', newline 'LF', escape 'OFF') +LOG ERRORS SEGMENT REJECT LIMIT 10 ROWS; +SELECT * FROM copy_converse_varify_error; +DROP TABLE copy_converse_varify_error; + +CREATE TABLE copy_eol_on_nextrawpage(b text); +COPY copy_eol_on_nextrawpage FROM '/home/reshke/cbdb/src/test/regress/data/eol_on_next_raw_page.data' +WITH(FORMAT text, delimiter '|', "null" E'\\N', newline 'LF', escape 'OFF') +LOG ERRORS SEGMENT REJECT LIMIT 10 ROWS; +SELECT count(*) FROM copy_eol_on_nextrawpage; +DROP TABLE copy_eol_on_nextrawpage;