diff --git a/.github/workflows/ci-workflow.yml b/.github/workflows/ci-workflow.yml index 7d17ee2ae9d..ef91677aa06 100644 --- a/.github/workflows/ci-workflow.yml +++ b/.github/workflows/ci-workflow.yml @@ -198,6 +198,37 @@ jobs: - name: Java test run: make javatest + clang-build-test-with-tsan: + name: clang build and test with tsan + needs: [clang-format, sanity-checks, python-lint-check] + runs-on: kuzu-self-hosted-testing + env: + NUM_THREADS: 32 + TEST_JOBS: 16 + CC: clang + CXX: clang++ + UW_S3_ACCESS_KEY_ID: ${{ secrets.UW_S3_ACCESS_KEY_ID }} + UW_S3_SECRET_ACCESS_KEY: ${{ secrets.UW_S3_SECRET_ACCESS_KEY }} + AWS_S3_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }} + AWS_S3_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }} + AWS_ACCESS_KEY_ID: ${{ secrets.AWS_S3_ACCESS_KEY_ID }} + AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_S3_SECRET_ACCESS_KEY }} + RUN_ID: "$(hostname)-$(date +%s)" + steps: + - uses: actions/checkout@v3 + + - name: Ensure Python dependencies + run: | + pip install torch~=2.0.0 --extra-index-url https://download.pytorch.org/whl/cpu + pip install --user -r tools/python_api/requirements_dev.txt -f https://data.pyg.org/whl/torch-2.0.0+cpu.html + + - name: Ensure Node.js dependencies + run: npm install --include=dev + working-directory: tools/nodejs_api + + - name: Test + run: make test TSAN=1 + msvc-build-test: name: msvc build & test needs: [clang-format, sanity-checks, python-lint-check] diff --git a/CMakeLists.txt b/CMakeLists.txt index d37ba66b063..e0a418e1ab1 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -124,6 +124,7 @@ if(NOT MSVC) endif() if(${ENABLE_THREAD_SANITIZER}) + add_compile_definitions(KUZU_TSAN) if(MSVC) message(FATAL_ERROR "Thread sanitizer is not supported on MSVC") else() diff --git a/src/include/common/constants.h b/src/include/common/constants.h index 6964fdadae9..96931ec3fd3 100644 --- a/src/include/common/constants.h +++ b/src/include/common/constants.h @@ -74,6 +74,8 @@ struct BufferPoolConstants { // The default max size for a VMRegion. #ifdef __32BIT__ static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 30; // (1GB) +#elif KUZU_TSAN + static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 36; // (64GB) #else static constexpr uint64_t DEFAULT_VM_REGION_MAX_SIZE = (uint64_t)1 << 43; // (8TB) #endif diff --git a/src/include/processor/operator/mask.h b/src/include/processor/operator/mask.h index a7871261a87..d148720396e 100644 --- a/src/include/processor/operator/mask.h +++ b/src/include/processor/operator/mask.h @@ -9,7 +9,7 @@ namespace processor { // Note: Classes in this file are NOT thread-safe. struct MaskUtil { - static inline common::offset_t getMorselIdx(common::offset_t offset) { + static common::offset_t getMorselIdx(common::offset_t offset) { return offset >> common::DEFAULT_VECTOR_CAPACITY_LOG_2; } }; @@ -23,8 +23,22 @@ struct MaskData { std::fill(data, data + size, 0); } - inline void setMask(uint64_t pos, uint8_t maskValue) const { data[pos] = maskValue; } - inline bool isMasked(uint64_t pos, uint8_t trueMaskVal) const { +#if KUZU_TSAN +#if defined(__has_feature) && __has_feature(thread_sanitizer) + __attribute__((no_sanitize("thread"))) +#endif +#endif + void + setMask(uint64_t pos, uint8_t maskValue) const { + data[pos] = maskValue; + } +#if KUZU_TSAN +#if defined(__has_feature) && __has_feature(thread_sanitizer) + __attribute__((no_sanitize("thread"))) +#endif +#endif + bool + isMasked(uint64_t pos, uint8_t trueMaskVal) const { return data[pos] == trueMaskVal; } @@ -37,7 +51,7 @@ class MaskCollection { public: MaskCollection() : numMasks{0} {} - inline void init(common::offset_t maxOffset) { + void init(common::offset_t maxOffset) { std::unique_lock lck{mtx}; if (maskData != nullptr) { // MaskCollection might be initialized repeatedly. return; @@ -45,17 +59,18 @@ class MaskCollection { maskData = std::make_unique(maxOffset + 1); } - inline bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); } + bool isMasked(common::offset_t offset) { return maskData->isMasked(offset, numMasks); } + // Increment mask value for the given nodeOffset if its current mask value is equal to // the specified `currentMaskValue`. - inline void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) { + void incrementMaskValue(common::offset_t offset, uint8_t currentMaskValue) { if (maskData->isMasked(offset, currentMaskValue)) { maskData->setMask(offset, currentMaskValue + 1); } } - inline uint8_t getNumMasks() const { return numMasks; } - inline void incrementNumMasks() { numMasks++; } + uint8_t getNumMasks() const { return numMasks; } + void incrementNumMasks() { numMasks++; } private: std::mutex mtx; @@ -75,8 +90,8 @@ class NodeSemiMask { virtual uint8_t getNumMasks() const = 0; virtual void incrementNumMasks() = 0; - inline bool isEnabled() const { return getNumMasks() > 0; } - inline storage::NodeTable* getNodeTable() const { return nodeTable; } + bool isEnabled() const { return getNumMasks() > 0; } + storage::NodeTable* getNodeTable() const { return nodeTable; } protected: storage::NodeTable* nodeTable; @@ -88,7 +103,7 @@ class NodeOffsetSemiMask : public NodeSemiMask { offsetMask = std::make_unique(); } - inline void init(transaction::Transaction* trx) override { + void init(transaction::Transaction* trx) override { auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx); if (maxNodeOffset == common::INVALID_OFFSET) { return; @@ -96,16 +111,14 @@ class NodeOffsetSemiMask : public NodeSemiMask { offsetMask->init(nodeTable->getMaxNodeOffset(trx) + 1); } - inline void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override { + void incrementMaskValue(common::offset_t nodeOffset, uint8_t currentMaskValue) override { offsetMask->incrementMaskValue(nodeOffset, currentMaskValue); } - inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } - inline void incrementNumMasks() override { offsetMask->incrementNumMasks(); } + uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } + void incrementNumMasks() override { offsetMask->incrementNumMasks(); } - inline bool isNodeMasked(common::offset_t nodeOffset) { - return offsetMask->isMasked(nodeOffset); - } + bool isNodeMasked(common::offset_t nodeOffset) { return offsetMask->isMasked(nodeOffset); } private: std::unique_ptr offsetMask; @@ -118,7 +131,7 @@ class NodeOffsetAndMorselSemiMask : public NodeSemiMask { morselMask = std::make_unique(); } - inline void init(transaction::Transaction* trx) override { + void init(transaction::Transaction* trx) override { auto maxNodeOffset = nodeTable->getMaxNodeOffset(trx); if (maxNodeOffset == common::INVALID_OFFSET) { return; @@ -129,23 +142,19 @@ class NodeOffsetAndMorselSemiMask : public NodeSemiMask { // Note: blindly update mask does not parallelize well, so we minimize write by first checking // if the mask is set to true (mask value is equal to the expected currentMaskValue) or not. - inline void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override { + void incrementMaskValue(uint64_t nodeOffset, uint8_t currentMaskValue) override { offsetMask->incrementMaskValue(nodeOffset, currentMaskValue); morselMask->incrementMaskValue(MaskUtil::getMorselIdx(nodeOffset), currentMaskValue); } - inline uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } - inline void incrementNumMasks() override { + uint8_t getNumMasks() const override { return offsetMask->getNumMasks(); } + void incrementNumMasks() override { offsetMask->incrementNumMasks(); morselMask->incrementNumMasks(); } - inline bool isMorselMasked(common::offset_t morselIdx) { - return morselMask->isMasked(morselIdx); - } - inline bool isNodeMasked(common::offset_t nodeOffset) { - return offsetMask->isMasked(nodeOffset); - } + bool isMorselMasked(common::offset_t morselIdx) { return morselMask->isMasked(morselIdx); } + bool isNodeMasked(common::offset_t nodeOffset) { return offsetMask->isMasked(nodeOffset); } private: std::unique_ptr offsetMask; diff --git a/src/include/storage/stats/property_statistics.h b/src/include/storage/stats/property_statistics.h index afbfdeaab94..2fb03402fa9 100644 --- a/src/include/storage/stats/property_statistics.h +++ b/src/include/storage/stats/property_statistics.h @@ -18,7 +18,15 @@ class PropertyStatistics { void serialize(common::Serializer& serializer) const; static std::unique_ptr deserialize(common::Deserializer& deserializer); - inline void setHasNull() { mayHaveNullValue = true; } +#if KUZU_TSAN +#if defined(__has_feature) && __has_feature(thread_sanitizer) + __attribute__((no_sanitize("thread"))) +#endif +#endif + void + setHasNull() { + mayHaveNullValue = true; + } private: // Stores whether or not the property is known to have contained a null value diff --git a/src/include/storage/stats/table_statistics_collection.h b/src/include/storage/stats/table_statistics_collection.h index dc2b84c13a0..f241182dc39 100644 --- a/src/include/storage/stats/table_statistics_collection.h +++ b/src/include/storage/stats/table_statistics_collection.h @@ -81,7 +81,15 @@ class TablesStatistics { void initTableStatisticsForWriteTrx(); - void setToUpdated() { isUpdated = true; } +#if KUZU_TSAN +#if defined(__has_feature) && __has_feature(thread_sanitizer) + __attribute__((no_sanitize("thread"))) +#endif +#endif + void + setToUpdated() { + isUpdated = true; + } protected: virtual std::unique_ptr constructTableStatistic( diff --git a/src/processor/operator/physical_operator.cpp b/src/processor/operator/physical_operator.cpp index df0f508137e..373c3eb8d9e 100644 --- a/src/processor/operator/physical_operator.cpp +++ b/src/processor/operator/physical_operator.cpp @@ -186,7 +186,11 @@ bool PhysicalOperator::getNextTuple(ExecutionContext* context) { } metrics->executionTime.start(); auto result = getNextTuplesInternal(context); +#if KUZU_TSAN +#if !defined(__has_feature) || !__has_feature(thread_sanitizer) context->clientContext->getProgressBar()->updateProgress(getProgress(context)); +#endif +#endif metrics->executionTime.stop(); return result; } diff --git a/src/storage/stats/property_statistics.cpp b/src/storage/stats/property_statistics.cpp index 953b5f00b3f..28ae9b3e42d 100644 --- a/src/storage/stats/property_statistics.cpp +++ b/src/storage/stats/property_statistics.cpp @@ -43,14 +43,15 @@ void RWPropertyStats::setHasNull(const transaction::Transaction& transaction) { // TODO(Guodong): INVALID_PROPERTY_ID is used here because we have a column, i.e., nbrIDColumn, // not exposed as property in table schema, but still have nullColumn. Should be fixed once we // properly align properties and chunks. - if (propertyID != common::INVALID_PROPERTY_ID) { - KU_ASSERT(tablesStatistics); - auto& propStats = - tablesStatistics->getPropertyStatisticsForTable(transaction, tableID, propertyID); - if (!propStats.mayHaveNull()) { - propStats.setHasNull(); - tablesStatistics->setToUpdated(); - } + if (propertyID == common::INVALID_PROPERTY_ID) { + return; + } + KU_ASSERT(tablesStatistics); + auto& propStats = + tablesStatistics->getPropertyStatisticsForTable(transaction, tableID, propertyID); + if (!propStats.mayHaveNull()) { + propStats.setHasNull(); + tablesStatistics->setToUpdated(); } } diff --git a/src/storage/stats/table_statistics_collection.cpp b/src/storage/stats/table_statistics_collection.cpp index 2ae00f39fa3..286096edbb6 100644 --- a/src/storage/stats/table_statistics_collection.cpp +++ b/src/storage/stats/table_statistics_collection.cpp @@ -77,7 +77,7 @@ void TablesStatistics::setPropertyStatisticsForTable(table_id_t tableID, propert KU_ASSERT(readWriteVersion && readWriteVersion->tableStatisticPerTable.contains(tableID)); setToUpdated(); auto tableStatistics = readWriteVersion->tableStatisticPerTable.at(tableID).get(); - tableStatistics->setPropertyStatistics(propertyID, stats); + tableStatistics->setPropertyStatistics(propertyID, std::move(stats)); } std::unique_ptr TablesStatistics::createMetadataDAHInfo(