Skip to content

Commit

Permalink
Fix concurrency issue on PageState; add chunk state for prepareCommit (
Browse files Browse the repository at this point in the history
…#3388)

* remove useless param in initReadState
* write state
* add locks
* fix null props
  • Loading branch information
ray6080 authored Apr 27, 2024
1 parent d9dcc0a commit 12b8495
Show file tree
Hide file tree
Showing 20 changed files with 408 additions and 447 deletions.
1 change: 1 addition & 0 deletions src/include/storage/buffer_manager/bm_file_handle.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ class BMFileHandle : public FileHandle {

private:
inline PageState* getPageState(common::page_idx_t pageIdx) {
std::shared_lock sLck{fhSharedMutex};
KU_ASSERT(pageIdx < numPages && pageStates[pageIdx]);
return pageStates[pageIdx].get();
}
Expand Down
3 changes: 2 additions & 1 deletion src/include/storage/stats/table_statistics_collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ class TablesStatistics {

void initTableStatisticsForWriteTrx();

void setToUpdated() { isUpdated = true; }

protected:
virtual std::unique_ptr<TableStatistics> constructTableStatistic(
catalog::TableCatalogEntry* tableEntry) = 0;
Expand All @@ -101,7 +103,6 @@ class TablesStatistics {

void initTableStatisticsForWriteTrxNoLock();

void setToUpdated() { isUpdated = true; }
void resetToNotUpdated() { isUpdated = false; }

protected:
Expand Down
88 changes: 41 additions & 47 deletions src/include/storage/store/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ class Column {
friend class RelTableData;

public:
struct ReadState {
explicit ReadState() = default;
ReadState(ColumnChunkMetadata metadata, uint64_t numValuesPerPage)
struct ChunkState {
explicit ChunkState() = default;
ChunkState(ColumnChunkMetadata metadata, uint64_t numValuesPerPage)
: metadata{std::move(metadata)}, numValuesPerPage{numValuesPerPage} {}

ColumnChunkMetadata metadata;
uint64_t numValuesPerPage = UINT64_MAX;
common::node_group_idx_t nodeGroupIdx = common::INVALID_NODE_GROUP_IDX;
std::unique_ptr<ReadState> nullState = nullptr;
std::unique_ptr<ChunkState> nullState = nullptr;
// Used for struct/list/string columns.
std::vector<ReadState> childrenStates;
std::vector<ChunkState> childrenStates;
};

Column(std::string name, common::LogicalType dataType, const MetadataDAHInfo& metaDAHeaderInfo,
Expand All @@ -61,17 +61,16 @@ class Column {
virtual void batchLookup(transaction::Transaction* transaction,
const common::offset_t* nodeOffsets, size_t size, uint8_t* result);

virtual void initReadState(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, common::offset_t startOffsetInChunk,
ReadState& columnReadState);
virtual void initChunkState(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, ChunkState& state);

virtual void scan(transaction::Transaction* transaction, ReadState& readState,
virtual void scan(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookup(transaction::Transaction* transaction, ReadState& readState,
virtual void lookup(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);

// Scan from [startOffsetInGroup, endOffsetInGroup).
virtual void scan(transaction::Transaction* transaction, ReadState& readState,
virtual void scan(transaction::Transaction* transaction, ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector);
// Scan from [startOffsetInGroup, endOffsetInGroup).
Expand Down Expand Up @@ -114,57 +113,50 @@ class Column {

inline std::string getName() const { return name; }

virtual void scan(transaction::Transaction* transaction, const ReadState& state,
virtual void scan(transaction::Transaction* transaction, const ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup, uint8_t* result);

// Write a single value from the vectorToWriteFrom.
virtual void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
virtual void write(ChunkState& state, common::offset_t offsetInChunk,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
// Batch write to a set of sequential pages.
virtual void write(common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
ColumnChunk* data, common::offset_t dataOffset, common::length_t numValues);
virtual void write(ChunkState& state, common::offset_t offsetInChunk, ColumnChunk* data,
common::offset_t dataOffset, common::length_t numValues);

// Append values to the end of the node group, resizing it if necessary
common::offset_t appendValues(common::node_group_idx_t nodeGroupIdx, const uint8_t* data,
common::offset_t appendValues(ChunkState& state, const uint8_t* data,
const common::NullMask* nullChunkData, common::offset_t numValues);

ReadState getReadState(transaction::TransactionType transactionType,
common::node_group_idx_t nodeGroupIdx) const;

virtual std::unique_ptr<ColumnChunk> getEmptyChunkForCommit(uint64_t capacity);
static void applyLocalChunkToColumnChunk(const ChunkCollection& localChunks,
ColumnChunk* columnChunk, const offset_to_row_idx_t& info);

protected:
virtual void scanInternal(transaction::Transaction* transaction, ReadState& readState,
virtual void scanInternal(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
void scanUnfiltered(transaction::Transaction* transaction, PageCursor& pageCursor,
uint64_t numValuesToScan, common::ValueVector* resultVector,
const ColumnChunkMetadata& chunkMeta, uint64_t startPosInVector = 0);
void scanFiltered(transaction::Transaction* transaction, PageCursor& pageCursor,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector,
const ColumnChunkMetadata& chunkMeta);
virtual void lookupInternal(transaction::Transaction* transaction, ReadState& readState,
virtual void lookupInternal(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector);
virtual void lookupValue(transaction::Transaction* transaction, ReadState& readState,
virtual void lookupValue(transaction::Transaction* transaction, ChunkState& state,
common::offset_t nodeOffset, common::ValueVector* resultVector, uint32_t posInVector);

void readFromPage(transaction::Transaction* transaction, common::page_idx_t pageIdx,
const std::function<void(uint8_t*)>& func);

virtual void writeValue(const ColumnChunkMetadata& chunkMeta,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk,
virtual void writeValue(ChunkState& state, common::offset_t offsetInChunk,
common::ValueVector* vectorToWriteFrom, uint32_t posInVectorToWriteFrom);
virtual void writeValues(ReadState& state, common::offset_t offsetInChunk, const uint8_t* data,
virtual void writeValues(ChunkState& state, common::offset_t offsetInChunk, const uint8_t* data,
const common::NullMask* nullChunkData, common::offset_t dataOffset = 0,
common::offset_t numValues = 1);

// Produces a page cursor for the offset relative to the given node group
PageCursor getPageCursorForOffsetInGroup(common::offset_t offsetInChunk,
const ReadState& state);
// Produces a page cursor for the absolute node offset
PageCursor getPageCursorForOffset(transaction::TransactionType transactionType,
common::node_group_idx_t nodeGroupIdx, common::offset_t offsetInChunk);
const ChunkState& state);
void updatePageWithCursor(PageCursor cursor,
const std::function<void(uint8_t*, common::offset_t)>& writeOp);

Expand All @@ -185,32 +177,34 @@ class Column {
common::offset_t maxOffset);
bool checkUpdateInPlace(const ColumnChunkMetadata& metadata, const ChunkCollection& localChunks,
const offset_to_row_idx_t& writeInfo);
virtual bool canCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,

virtual bool canCommitInPlace(const ChunkState& state, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo);
virtual bool canCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t srcOffset);
virtual void commitLocalChunkInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const ChunkCollection& localInsertChunks,
const offset_to_row_idx_t& insertInfo, const ChunkCollection& localUpdateChunks,
const offset_to_row_idx_t& updateInfo, const offset_set_t& deleteInfo);
virtual bool canCommitInPlace(const ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

virtual void commitLocalChunkInPlace(transaction::Transaction* transaction, ChunkState& state,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
virtual void commitLocalChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const ChunkCollection& localInsertChunks, const offset_to_row_idx_t& insertInfo,
const ChunkCollection& localUpdateChunks, const offset_to_row_idx_t& updateInfo,
const offset_set_t& deleteInfo);
virtual void commitColumnChunkInPlace(common::node_group_idx_t nodeGroupIdx,

virtual void commitColumnChunkInPlace(ChunkState& state,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);
virtual void commitColumnChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, bool isNewNodeGroup,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

void applyLocalChunkToColumn(common::node_group_idx_t nodeGroupIdx,
const ChunkCollection& localChunks, const offset_to_row_idx_t& info);
void applyLocalChunkToColumn(ChunkState& state, const ChunkCollection& localChunks,
const offset_to_row_idx_t& info);

// check if val is in range [start, end)
static inline bool isInRange(uint64_t val, uint64_t start, uint64_t end) {
Expand Down Expand Up @@ -245,23 +239,23 @@ class InternalIDColumn : public Column {
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

inline void scan(transaction::Transaction* transaction, ReadState& readState,
inline void scan(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::scan(transaction, readState, nodeIDVector, resultVector);
Column::scan(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

inline void scan(transaction::Transaction* transaction, ReadState& readState,
inline void scan(transaction::Transaction* transaction, ChunkState& state,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector) override {
Column::scan(transaction, readState, startOffsetInGroup, endOffsetInGroup, resultVector,
Column::scan(transaction, state, startOffsetInGroup, endOffsetInGroup, resultVector,
offsetInVector);
populateCommonTableID(resultVector);
}

inline void lookup(transaction::Transaction* transaction, ReadState& readState,
inline void lookup(transaction::Transaction* transaction, ChunkState& state,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override {
Column::lookup(transaction, readState, nodeIDVector, resultVector);
Column::lookup(transaction, state, nodeIDVector, resultVector);
populateCommonTableID(resultVector);
}

Expand Down
29 changes: 12 additions & 17 deletions src/include/storage/store/dictionary_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,30 +15,26 @@ class DictionaryColumn {
BMFileHandle* dataFH, BMFileHandle* metadataFH, BufferManager* bufferManager, WAL* wal,
transaction::Transaction* transaction, RWPropertyStats stats, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, Column::ReadState& columnReadState);
void initChunkState(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, Column::ChunkState& columnReadState);

void append(common::node_group_idx_t nodeGroupIdx, const DictionaryChunk& dictChunk);

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
DictionaryChunk& dictChunk);
// Offsets to scan should be a sorted list of pairs mapping the index of the entry in the string
// dictionary (as read from the index column) to the output index in the result vector to store
// the string.
void scan(transaction::Transaction* transaction, const Column::ReadState& offsetState,
const Column::ReadState& dataState,
void scan(transaction::Transaction* transaction, const Column::ChunkState& offsetState,
const Column::ChunkState& dataState,
std::vector<std::pair<DictionaryChunk::string_index_t, uint64_t>>& offsetsToScan,
common::ValueVector* resultVector, const ColumnChunkMetadata& indexMeta);

DictionaryChunk::string_index_t append(common::node_group_idx_t nodeGroupIdx,
std::string_view val);
DictionaryChunk::string_index_t append(Column::ChunkState& state, std::string_view val);

bool canCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, uint64_t numNewStrings,
bool canCommitInPlace(const Column::ChunkState& state, uint64_t numNewStrings,
uint64_t totalStringLengthToAdd);

uint64_t getNumValuesInOffsets(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx);

void prepareCommit();
void checkpointInMemory();
void rollbackInMemory();
Expand All @@ -47,17 +43,16 @@ class DictionaryColumn {
inline Column* getOffsetColumn() const { return offsetColumn.get(); }

private:
void scanOffsets(transaction::Transaction* transaction, const Column::ReadState& state,
void scanOffsets(transaction::Transaction* transaction, const Column::ChunkState& state,
DictionaryChunk::string_offset_t* offsets, uint64_t index, uint64_t numValues,
uint64_t dataSize);
void scanValueToVector(transaction::Transaction* transaction,
const Column::ReadState& dataState, uint64_t startOffset, uint64_t endOffset,
const Column::ChunkState& dataState, uint64_t startOffset, uint64_t endOffset,
common::ValueVector* resultVector, uint64_t offsetInVector);

bool canDataCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, uint64_t totalStringLengthToAdd);
bool canOffsetCommitInPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, uint64_t numNewStrings,
bool canDataCommitInPlace(const Column::ChunkState& dataState, uint64_t totalStringLengthToAdd);
bool canOffsetCommitInPlace(const Column::ChunkState& offsetState,
const Column::ChunkState& dataState, uint64_t numNewStrings,
uint64_t totalStringLengthToAdd);

private:
Expand Down
30 changes: 14 additions & 16 deletions src/include/storage/store/list_column.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,41 +53,41 @@ class ListColumn final : public Column {
BufferManager* bufferManager, WAL* wal, transaction::Transaction* transaction,
RWPropertyStats propertyStatistics, bool enableCompression);

void initReadState(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
common::offset_t startOffsetInChunk, ReadState& columnReadState) override;
void scan(transaction::Transaction* transaction, ReadState& readState,
void initChunkState(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, ChunkState& columnReadState) override;

void scan(transaction::Transaction* transaction, ChunkState& readState,
common::offset_t startOffsetInGroup, common::offset_t endOffsetInGroup,
common::ValueVector* resultVector, uint64_t offsetInVector = 0) override;

void scan(transaction::Transaction* transaction, common::node_group_idx_t nodeGroupIdx,
ColumnChunk* columnChunk, common::offset_t startOffset = 0,
common::offset_t endOffset = common::INVALID_OFFSET) override;

inline Column* getDataColumn() { return dataColumn.get(); }

protected:
void scanInternal(transaction::Transaction* transaction, ReadState& readState,
void scanInternal(transaction::Transaction* transaction, ChunkState& readState,
common::ValueVector* nodeIDVector, common::ValueVector* resultVector) override;

void lookupValue(transaction::Transaction* transaction, ReadState& readState,
void lookupValue(transaction::Transaction* transaction, ChunkState& readState,
common::offset_t nodeOffset, common::ValueVector* resultVector,
uint32_t posInVector) override;

void append(ColumnChunk* columnChunk, uint64_t nodeGroupIdx) override;

private:
void scanUnfiltered(transaction::Transaction* transaction, ReadState& readState,
void scanUnfiltered(transaction::Transaction* transaction, ChunkState& readState,
common::ValueVector* resultVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);
void scanFiltered(transaction::Transaction* transaction, ReadState& readState,
void scanFiltered(transaction::Transaction* transaction, ChunkState& readState,
common::ValueVector* offsetVector, const ListOffsetSizeInfo& listOffsetInfoInStorage);

void prepareCommit() override;
void checkpointInMemory() override;
void rollbackInMemory() override;

common::offset_t readOffset(transaction::Transaction* transaction, const ReadState& readState,
common::offset_t readOffset(transaction::Transaction* transaction, const ChunkState& readState,
common::offset_t offsetInNodeGroup);
common::list_size_t readSize(transaction::Transaction* transaction, const ReadState& readState,
common::list_size_t readSize(transaction::Transaction* transaction, const ChunkState& readState,
common::offset_t offsetInNodeGroup);

ListOffsetSizeInfo getListOffsetSizeInfo(transaction::Transaction* transaction,
Expand All @@ -102,15 +102,13 @@ class ListColumn final : public Column {
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset) override;

void prepareCommitForOffsetChunk(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);

void prepareCommitForOffsetChunk(transaction::Transaction* transaction, ChunkState& offsetState,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t startSrcOffset);
void commitOffsetColumnChunkOutOfPlace(transaction::Transaction* transaction,
common::node_group_idx_t nodeGroupIdx, const std::vector<common::offset_t>& dstOffsets,
ColumnChunk* chunk, common::offset_t startSrcOffset);

void commitOffsetColumnChunkInPlace(common::node_group_idx_t nodeGroupIdx,
void commitOffsetColumnChunkInPlace(ChunkState& offsetChunk,
const std::vector<common::offset_t>& dstOffsets, ColumnChunk* chunk,
common::offset_t srcOffset);

Expand Down
Loading

0 comments on commit 12b8495

Please sign in to comment.