diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp index 7f848598a97..05d7a9d07aa 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp @@ -76,7 +76,7 @@ OperatorStatus DMSegmentThreadSourceOp::readImpl(Block & block) OperatorStatus DMSegmentThreadSourceOp::executeIOImpl() { - if unlikely (done) + if (unlikely(done)) return OperatorStatus::HAS_OUTPUT; while (!cur_stream) @@ -92,7 +92,7 @@ OperatorStatus DMSegmentThreadSourceOp::executeIOImpl() auto block_size = std::max( expected_block_size, - static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + static_cast(dm_context->global_context.getSettingsRef().dt_segment_stable_pack_rows)); cur_stream = task->segment->getInputStream( read_mode, *dm_context, diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.h b/dbms/src/Operators/DMSegmentThreadSourceOp.h index 222cb507a98..3e89c01fcce 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.h +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.h @@ -15,7 +15,7 @@ #pragma once #include -#include +#include #include #include diff --git a/dbms/src/Server/DTTool/DTToolBench.cpp b/dbms/src/Server/DTTool/DTToolBench.cpp index 6b35ea7a266..f8c6e8f956f 100644 --- a/dbms/src/Server/DTTool/DTToolBench.cpp +++ b/dbms/src/Server/DTTool/DTToolBench.cpp @@ -365,7 +365,7 @@ int benchEntry(const std::vector & opts) auto storage_pool = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; - auto dm_context = std::make_unique( // + auto dm_context = DB::DM::DMContext::createUnique( *db_context, path_pool, storage_pool, diff --git a/dbms/src/Server/tests/gtest_dttool.cpp b/dbms/src/Server/tests/gtest_dttool.cpp index 208859e1484..ff01c39acc6 100644 --- a/dbms/src/Server/tests/gtest_dttool.cpp +++ b/dbms/src/Server/tests/gtest_dttool.cpp @@ -84,7 +84,7 @@ struct DTToolTest : public DB::base::TiFlashStorageTestBasic auto storage_pool = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 1, *path_pool, "test.t1"); auto dm_settings = DB::DM::DeltaMergeStore::Settings{}; - auto dm_context = std::make_unique( // + auto dm_context = DB::DM::DMContext::createUnique( *db_context, path_pool, storage_pool, diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp index 9278bfe6c6d..1675ceddcfa 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp @@ -36,9 +36,9 @@ ColumnFileBig::ColumnFileBig(const DMContext & context, const DMFilePtr & file_, calculateStat(context); } -void ColumnFileBig::calculateStat(const DMContext & context) +void ColumnFileBig::calculateStat(const DMContext & dm_context) { - auto index_cache = context.db_context.getGlobalContext().getMinMaxIndexCache(); + auto index_cache = dm_context.global_context.getMinMaxIndexCache(); auto pack_filter = DMFilePackFilter::loadFrom( file, @@ -47,10 +47,10 @@ void ColumnFileBig::calculateStat(const DMContext & context) {segment_range}, EMPTY_RS_OPERATOR, {}, - context.db_context.getFileProvider(), - context.getReadLimiter(), - context.scan_context, - /*tracing_id*/ context.tracing_id); + dm_context.global_context.getFileProvider(), + dm_context.getReadLimiter(), + dm_context.scan_context, + /*tracing_id*/ dm_context.tracing_id); std::tie(valid_rows, valid_bytes) = pack_filter.validRowsAndBytes(); } @@ -79,7 +79,7 @@ void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) c } ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( - const DMContext & context, // + const DMContext & dm_context, // const RowKeyRange & segment_range, ReadBuffer & buf) { @@ -92,13 +92,16 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( String file_parent_path; DMFilePtr dmfile; - auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; - auto path_delegate = context.path_pool->getStableDiskDelegator(); + auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; + auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); if (remote_data_store) { - auto wn_ps = context.db_context.getWriteNodePageStorage(); + auto wn_ps = dm_context.global_context.getWriteNodePageStorage(); auto full_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id), + UniversalPageIdFormat::toFullPrefix( + dm_context.keyspace_id, + StorageType::Data, + dm_context.physical_table_id), file_page_id); auto full_external_id = wn_ps->getNormalPageId(full_page_id); auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id); @@ -112,10 +115,10 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( } else { - auto file_id = context.storage_pool->dataReader()->getNormalPageId(file_page_id); - file_parent_path = context.path_pool->getStableDiskDelegator().getDTFilePath(file_id); + auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(file_page_id); + file_parent_path = dm_context.path_pool->getStableDiskDelegator().getDTFilePath(file_id); dmfile = DMFile::restore( - context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), file_id, file_page_id, file_parent_path, @@ -129,7 +132,7 @@ ColumnFilePersistedPtr ColumnFileBig::deserializeMetadata( } ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( - DMContext & context, // + DMContext & dm_context, // const RowKeyRange & target_range, ReadBuffer & buf, UniversalPageStoragePtr temp_ps, @@ -143,21 +146,21 @@ ColumnFilePersistedPtr ColumnFileBig::createFromCheckpoint( readIntBinary(valid_bytes, buf); auto remote_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id), + UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Data, dm_context.physical_table_id), file_page_id); auto remote_data_location = temp_ps->getCheckpointLocation(remote_page_id); auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile(); auto file_oid = data_key_view.getDMFileOID(); auto data_key = data_key_view.toFullKey(); - auto delegator = context.path_pool->getStableDiskDelegator(); - auto new_local_page_id = context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); + auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); PS::V3::CheckpointLocation loc{ .data_file_id = std::make_shared(data_key), .offset_in_file = 0, .size_in_file = 0, }; wbs.data.putRemoteExternal(new_local_page_id, loc); - auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; + auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; auto prepared = remote_data_store->prepareDMFile(file_oid, new_local_page_id); auto dmfile = prepared->restore(DMFile::ReadMetaMode::all()); wbs.writeLogAndData(); @@ -172,10 +175,13 @@ void ColumnFileBigReader::initStream() if (file_stream) return; - DMFileBlockInputStreamBuilder builder(context.db_context); - file_stream - = builder.setTracingID(context.tracing_id) - .build(column_file.getFile(), *col_defs, RowKeyRanges{column_file.segment_range}, context.scan_context); + DMFileBlockInputStreamBuilder builder(dm_context.global_context); + file_stream = builder.setTracingID(dm_context.tracing_id) + .build( + column_file.getFile(), + *col_defs, + RowKeyRanges{column_file.segment_range}, + dm_context.scan_context); header = file_stream->getHeader(); // If we only need to read pk and version columns, then cache columns data in memory. @@ -383,7 +389,7 @@ size_t ColumnFileBigReader::skipNextBlock() ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs) { // Currently we don't reuse the cache data. - return std::make_shared(context, column_file, new_col_defs); + return std::make_shared(dm_context, column_file, new_col_defs); } } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h index be45911ae8e..41a289b19e2 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h @@ -114,7 +114,7 @@ class ColumnFileBig : public ColumnFilePersisted class ColumnFileBigReader : public ColumnFileReader { private: - const DMContext & context; + const DMContext & dm_context; const ColumnFileBig & column_file; const ColumnDefinesPtr col_defs; @@ -155,7 +155,7 @@ class ColumnFileBigReader : public ColumnFileReader const DMContext & context_, const ColumnFileBig & column_file_, const ColumnDefinesPtr & col_defs_) - : context(context_) + : dm_context(context_) , column_file(column_file_) , col_defs(col_defs_) { diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp index deb055e7105..8f38b1b1d64 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h index efe1542ae6d..85cf08bb7dd 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFilePersisted.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp index a906b00e27f..a969b9fcc19 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.cpp @@ -15,6 +15,7 @@ #include #include #include +#include namespace DB { @@ -129,9 +130,9 @@ ColumnFileSchemaPtr SharedBlockSchemas::getOrCreate(const Block & block) return schema; } -std::shared_ptr getSharedBlockSchemas(const DMContext & context) +std::shared_ptr getSharedBlockSchemas(const DMContext & dm_context) { - return context.db_context.getSharedBlockSchemas(); + return dm_context.global_context.getSharedBlockSchemas(); } } // namespace DM -} // namespace DB \ No newline at end of file +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h index 192befe7821..06256545a83 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSchema.h @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h index 47f6a78bae2..adc565b2543 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h @@ -15,6 +15,7 @@ #pragma once #include +#include #include namespace DB diff --git a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp index 4800dc9b0c7..9526e78d217 100644 --- a/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp +++ b/dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileTiny.cpp @@ -277,13 +277,13 @@ ColumnFileTinyPtr ColumnFileTiny::writeColumnFile( } PageIdU64 ColumnFileTiny::writeColumnFileData( - const DMContext & context, + const DMContext & dm_context, const Block & block, size_t offset, size_t limit, WriteBatches & wbs) { - auto page_id = context.storage_pool->newLogPageId(); + auto page_id = dm_context.storage_pool->newLogPageId(); MemoryWriteBuffer write_buf; PageFieldSizes col_data_sizes; @@ -296,8 +296,8 @@ PageIdU64 ColumnFileTiny::writeColumnFileData( col.type, offset, limit, - context.db_context.getSettingsRef().dt_compression_method, - context.db_context.getSettingsRef().dt_compression_level); + dm_context.global_context.getSettingsRef().dt_compression_method, + dm_context.global_context.getSettingsRef().dt_compression_level); size_t serialized_size = write_buf.count() - last_buf_size; RUNTIME_CHECK_MSG( serialized_size != 0, diff --git a/dbms/src/Storages/DeltaMerge/DMContext.cpp b/dbms/src/Storages/DeltaMerge/DMContext.cpp index f7cd0232c40..7159672bf20 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.cpp +++ b/dbms/src/Storages/DeltaMerge/DMContext.cpp @@ -19,11 +19,11 @@ namespace DB::DM { WriteLimiterPtr DMContext::getWriteLimiter() const { - return db_context.getWriteLimiter(); + return global_context.getWriteLimiter(); } ReadLimiterPtr DMContext::getReadLimiter() const { - return db_context.getReadLimiter(); + return global_context.getReadLimiter(); } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/DMContext.h b/dbms/src/Storages/DeltaMerge/DMContext.h index 9257d08eaa1..16fdd020b2c 100644 --- a/dbms/src/Storages/DeltaMerge/DMContext.h +++ b/dbms/src/Storages/DeltaMerge/DMContext.h @@ -16,9 +16,10 @@ #include #include -#include +#include #include #include +#include #include #include @@ -35,15 +36,13 @@ namespace DM class StoragePool; using StoragePoolPtr = std::shared_ptr; using NotCompress = std::unordered_set; -struct DMContext; -using DMContextPtr = std::shared_ptr; /** * This context object carries table infos. And those infos are only meaningful to current context. */ struct DMContext : private boost::noncopyable { - const Context & db_context; + const Context & global_context; // leaving these pointers possible to be nullptr is dangerous for only reading from/writing to local storage. Find a better way to handle it later StoragePathPoolPtr path_pool; @@ -93,8 +92,66 @@ struct DMContext : private boost::noncopyable const ScanContextPtr scan_context; public: + static DMContextPtr create( + const Context & session_context_, + const StoragePathPoolPtr & path_pool_, + const StoragePoolPtr & storage_pool_, + DB::Timestamp min_version_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool is_common_handle_, + size_t rowkey_column_size_, + const DB::Settings & settings, + const ScanContextPtr & scan_context_, + const String & tracing_id_) + { + return std::shared_ptr(new DMContext( + session_context_, + path_pool_, + storage_pool_, + min_version_, + keyspace_id_, + physical_table_id_, + is_common_handle_, + rowkey_column_size_, + settings, + scan_context_, + tracing_id_)); + } + + static std::unique_ptr createUnique( + const Context & session_context_, + const StoragePathPoolPtr & path_pool_, + const StoragePoolPtr & storage_pool_, + DB::Timestamp min_version_, + KeyspaceID keyspace_id_, + TableID physical_table_id_, + bool is_common_handle_, + size_t rowkey_column_size_, + const DB::Settings & settings) + { + return std::unique_ptr(new DMContext( + session_context_, + path_pool_, + storage_pool_, + min_version_, + keyspace_id_, + physical_table_id_, + is_common_handle_, + rowkey_column_size_, + settings, + nullptr, + "")); + } + + WriteLimiterPtr getWriteLimiter() const; + ReadLimiterPtr getReadLimiter() const; + + DM::DMConfigurationOpt createChecksumConfig() const { return DMChecksumConfig::fromDBContext(global_context); } + +private: DMContext( - const Context & db_context_, + const Context & session_context_, const StoragePathPoolPtr & path_pool_, const StoragePoolPtr & storage_pool_, const DB::Timestamp min_version_, @@ -103,9 +160,9 @@ struct DMContext : private boost::noncopyable bool is_common_handle_, size_t rowkey_column_size_, const DB::Settings & settings, - const ScanContextPtr scan_context_ = nullptr, + const ScanContextPtr & scan_context_ = nullptr, const String & tracing_id_ = "") - : db_context(db_context_) + : global_context(session_context_.getGlobalContext()) // always save the global context , path_pool(path_pool_) , storage_pool(storage_pool_) , min_version(min_version_) @@ -131,11 +188,6 @@ struct DMContext : private boost::noncopyable , tracing_id(tracing_id_) , scan_context(scan_context_ ? scan_context_ : std::make_shared()) {} - - WriteLimiterPtr getWriteLimiter() const; - ReadLimiterPtr getReadLimiter() const; - - DM::DMConfigurationOpt createChecksumConfig() const { return DMChecksumConfig::fromDBContext(db_context); } }; } // namespace DM diff --git a/dbms/src/Storages/DeltaMerge/DMContext_fwd.h b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h new file mode 100644 index 00000000000..2a8ebce59c8 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/DMContext_fwd.h @@ -0,0 +1,25 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include + +namespace DB::DM +{ + +struct DMContext; +using DMContextPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h b/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h index 06ea409d389..1021ad8fb6f 100644 --- a/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h +++ b/dbms/src/Storages/DeltaMerge/DMDecoratorStreams.h @@ -16,7 +16,6 @@ #include #include -#include #include #include @@ -43,7 +42,7 @@ class DMDeleteFilterBlockInputStream : public IBlockInputStream children.emplace_back(input); delete_col_pos = input->getHeader().getPositionByName(TAG_COLUMN_NAME); } - ~DMDeleteFilterBlockInputStream() + ~DMDeleteFilterBlockInputStream() override { LOG_TRACE( log, diff --git a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h index 735beec5512..f1e076214c5 100644 --- a/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h +++ b/dbms/src/Storages/DeltaMerge/DMSegmentThreadInputStream.h @@ -16,7 +16,7 @@ #include #include -#include +#include #include #include #include @@ -91,7 +91,7 @@ class DMSegmentThreadInputStream : public IProfilingBlockInputStream auto block_size = std::max( expected_block_size, - static_cast(dm_context->db_context.getSettingsRef().dt_segment_stable_pack_rows)); + static_cast(dm_context->global_context.getSettingsRef().dt_segment_stable_pack_rows)); cur_stream = task->segment->getInputStream( read_mode, *dm_context, diff --git a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp index fa3a79e663e..94f16efdea7 100644 --- a/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/Delta/DeltaValueSpace.cpp @@ -51,13 +51,13 @@ DeltaValueSpace::DeltaValueSpace(ColumnFilePersistedSetPtr && persisted_file_set , log(Logger::get()) {} -void DeltaValueSpace::abandon(DMContext & context) +void DeltaValueSpace::abandon(DMContext & dm_context) { bool v = false; if (!abandoned.compare_exchange_strong(v, true)) throw Exception("Try to abandon a already abandoned DeltaValueSpace", ErrorCodes::LOGICAL_ERROR); - if (auto manager = context.db_context.getDeltaIndexManager(); manager) + if (auto manager = dm_context.global_context.getDeltaIndexManager(); manager) manager->deleteRef(delta_index); } @@ -88,7 +88,7 @@ template struct CloneColumnFilesHelper { static std::vector clone( - DMContext & context, + DMContext & dm_context, const std::vector & src, const RowKeyRange & target_range, WriteBatches & wbs); @@ -96,7 +96,7 @@ struct CloneColumnFilesHelper template std::vector CloneColumnFilesHelper::clone( - DMContext & context, + DMContext & dm_context, const std::vector & src, const RowKeyRange & target_range, WriteBatches & wbs) @@ -132,15 +132,15 @@ std::vector CloneColumnFilesHelper::clone( else if (auto * t = column_file->tryToTinyFile(); t) { // Use a newly created page_id to reference the data page_id of current column file. - PageIdU64 new_data_page_id = context.storage_pool->newLogPageId(); + PageIdU64 new_data_page_id = dm_context.storage_pool->newLogPageId(); wbs.log.putRefPage(new_data_page_id, t->getDataPageId()); auto new_column_file = t->cloneWith(new_data_page_id); cloned.push_back(new_column_file); } else if (auto * f = column_file->tryToBigFile(); f) { - auto delegator = context.path_pool->getStableDiskDelegator(); - auto new_page_id = context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); + auto new_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); // Note that the file id may has already been mark as deleted. We must // create a reference to the page id itself instead of create a reference // to the file id. @@ -148,18 +148,18 @@ std::vector CloneColumnFilesHelper::clone( auto file_id = f->getFile()->fileId(); auto old_dmfile = f->getFile(); auto file_parent_path = old_dmfile->parentPath(); - if (!context.db_context.getSharedContextDisagg()->remote_data_store) + if (!dm_context.global_context.getSharedContextDisagg()->remote_data_store) { RUNTIME_CHECK(file_parent_path == delegator.getDTFilePath(file_id)); } auto new_file = DMFile::restore( - context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), file_id, /* page_id= */ new_page_id, file_parent_path, DMFile::ReadMetaMode::all()); - auto new_column_file = f->cloneWith(context, new_file, target_range); + auto new_column_file = f->cloneWith(dm_context, new_file, target_range); cloned.push_back(new_column_file); } else diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h index 83541bdb849..f6d501ebc32 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeHelpers.h @@ -12,10 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include - -#include - #pragma once #include @@ -30,6 +26,8 @@ #include #include +#include + namespace DB { namespace DM diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index de097c74b86..66bc52ebb78 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -449,13 +449,11 @@ DMContextPtr DeltaMergeStore::newDMContext( const String & tracing_id, ScanContextPtr scan_context_) { - std::shared_lock lock(read_write_mutex); - // Here we use global context from db_context, instead of db_context directly. // Because db_context could be a temporary object and won't last long enough during the query process. // Like the context created by InterpreterSelectWithUnionQuery. - auto * ctx = new DMContext( - db_context.getGlobalContext(), + return DMContext::create( + db_context, path_pool, storage_pool, latest_gc_safe_point.load(std::memory_order_acquire), @@ -466,7 +464,6 @@ DMContextPtr DeltaMergeStore::newDMContext( db_settings, scan_context_, tracing_id); - return DMContextPtr(ctx); } inline Block getSubBlock(const Block & block, size_t offset, size_t limit) @@ -1375,17 +1372,17 @@ Remote::DisaggPhysicalTableReadSnapshotPtr DeltaMergeStore::writeNodeBuildRemote size_t forceMergeDeltaRows(const DMContextPtr & dm_context) { - return dm_context->db_context.getSettingsRef().dt_segment_force_merge_delta_rows; + return dm_context->global_context.getSettingsRef().dt_segment_force_merge_delta_rows; } size_t forceMergeDeltaBytes(const DMContextPtr & dm_context) { - return dm_context->db_context.getSettingsRef().dt_segment_force_merge_delta_size; + return dm_context->global_context.getSettingsRef().dt_segment_force_merge_delta_size; } size_t forceMergeDeltaDeletes(const DMContextPtr & dm_context) { - return dm_context->db_context.getSettingsRef().dt_segment_force_merge_delta_deletes; + return dm_context->global_context.getSettingsRef().dt_segment_force_merge_delta_deletes; } void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const SegmentPtr & segment) @@ -1406,9 +1403,9 @@ void DeltaMergeStore::waitForWrite(const DMContextPtr & dm_context, const Segmen // The speed of delta merge in a very bad situation we assume. It should be a very conservative value. const size_t k10mb = 10 << 20; - size_t stop_write_delta_rows = dm_context->db_context.getSettingsRef().dt_segment_stop_write_delta_rows; - size_t stop_write_delta_bytes = dm_context->db_context.getSettingsRef().dt_segment_stop_write_delta_size; - size_t wait_duration_factor = dm_context->db_context.getSettingsRef().dt_segment_wait_duration_factor; + size_t stop_write_delta_rows = dm_context->global_context.getSettingsRef().dt_segment_stop_write_delta_rows; + size_t stop_write_delta_bytes = dm_context->global_context.getSettingsRef().dt_segment_stop_write_delta_size; + size_t wait_duration_factor = dm_context->global_context.getSettingsRef().dt_segment_wait_duration_factor; size_t sleep_ms; if (delta_rows >= stop_write_delta_rows || delta_bytes >= stop_write_delta_bytes) @@ -1528,7 +1525,7 @@ bool DeltaMergeStore::checkSegmentUpdate( && std::max(static_cast(column_file_count) - delta_last_try_compact_column_files, 0) >= 15; // Don't do background place index if we limit DeltaIndex cache. - bool should_place_delta_index = !dm_context->db_context.isDeltaIndexLimited() + bool should_place_delta_index = !dm_context->global_context.isDeltaIndexLimited() && (delta_rows - placed_delta_rows >= delta_cache_limit_rows * 3 && delta_rows - delta_last_try_place_delta_index_rows >= delta_cache_limit_rows); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp index 34e3a4cfe58..d37c70897c7 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp @@ -108,7 +108,7 @@ void DeltaMergeStore::cleanPreIngestFiles( { auto dm_context = newDMContext(db_context, db_settings); auto delegate = dm_context->path_pool->getStableDiskDelegator(); - auto file_provider = dm_context->db_context.getFileProvider(); + auto file_provider = dm_context->global_context.getFileProvider(); for (const auto & f : external_files) { @@ -144,7 +144,7 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile( bool clear_data_in_range) { auto delegate = dm_context->path_pool->getStableDiskDelegator(); - auto file_provider = dm_context->db_context.getFileProvider(); + auto file_provider = dm_context->global_context.getFileProvider(); Segments updated_segments; RowKeyRange cur_range = range; @@ -452,7 +452,7 @@ bool DeltaMergeStore::ingestDTFileIntoSegmentUsingSplit( */ auto delegate = dm_context.path_pool->getStableDiskDelegator(); - auto file_provider = dm_context.db_context.getFileProvider(); + auto file_provider = dm_context.global_context.getFileProvider(); WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); @@ -605,7 +605,7 @@ UInt64 DeltaMergeStore::ingestFiles( } // Check whether all external files are contained by the range. - if (dm_context->db_context.getSettingsRef().dt_enable_ingest_check) + if (dm_context->global_context.getSettingsRef().dt_enable_ingest_check) { for (const auto & ext_file : external_files) { @@ -627,17 +627,17 @@ UInt64 DeltaMergeStore::ingestFiles( EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS); auto delegate = dm_context->path_pool->getStableDiskDelegator(); - auto file_provider = dm_context->db_context.getFileProvider(); + auto file_provider = dm_context->global_context.getFileProvider(); size_t rows = 0; size_t bytes = 0; size_t bytes_on_disk = 0; - auto remote_data_store = dm_context->db_context.getSharedContextDisagg()->remote_data_store; + auto remote_data_store = dm_context->global_context.getSharedContextDisagg()->remote_data_store; StoreID store_id = InvalidStoreID; if (remote_data_store) { - store_id = dm_context->db_context.getTMTContext().getKVStore()->getStoreID(); + store_id = dm_context->global_context.getTMTContext().getKVStore()->getStoreID(); } DMFiles files; @@ -770,7 +770,7 @@ UInt64 DeltaMergeStore::ingestFiles( // Assume that one segment get compacted after file ingested, `gc_handle` gc the // DTFiles before they get applied to all segments. Then we will apply some // deleted DTFiles to other segments. - if (auto data_store = dm_context->db_context.getSharedContextDisagg()->remote_data_store; !data_store) + if (auto data_store = dm_context->global_context.getSharedContextDisagg()->remote_data_store; !data_store) { for (auto & file : files) file->enableGC(); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp index f2f16246919..a9d3d073244 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalBg.cpp @@ -267,7 +267,7 @@ void DeltaMergeStore::setUpBackgroundTask(const DMContextPtr & dm_context) // we must make the callbacks safe. ExternalPageCallbacks callbacks; callbacks.prefix = storage_pool->getNamespaceID(); - if (auto data_store = dm_context->db_context.getSharedContextDisagg()->remote_data_store; !data_store) + if (auto data_store = dm_context->global_context.getSharedContextDisagg()->remote_data_store; !data_store) { callbacks.scanner = LocalDMFileGcScanner(std::weak_ptr(path_pool), global_context.getFileProvider()); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 7995a3280ed..c376ffa9245 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -222,7 +222,7 @@ SegmentPair DeltaMergeStore::segmentSplit( } if constexpr (DM_RUN_CHECK) - check(dm_context.db_context); + check(dm_context.global_context); return {new_left, new_right}; } @@ -362,7 +362,7 @@ SegmentPtr DeltaMergeStore::segmentMerge( GET_METRIC(tiflash_storage_throughput_rows, type_merge).Increment(delta_rows); if constexpr (DM_RUN_CHECK) - check(dm_context.db_context); + check(dm_context.global_context); return merged; } @@ -520,7 +520,7 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( GET_METRIC(tiflash_storage_throughput_rows, type_delta_merge).Increment(delta_rows); if constexpr (DM_RUN_CHECK) - check(dm_context.db_context); + check(dm_context.global_context); return new_segment; } @@ -630,7 +630,7 @@ SegmentPtr DeltaMergeStore::segmentIngestData( } if constexpr (DM_RUN_CHECK) - check(dm_context.db_context); + check(dm_context.global_context); return new_segment; } @@ -690,7 +690,7 @@ SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceDataFromCheckpoint( wbs.writeRemoves(); if constexpr (DM_RUN_CHECK) - check(dm_context.db_context); + check(dm_context.global_context); return new_segment; } diff --git a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h index 3391af14444..76deb7f4500 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFilePackFilter.h @@ -19,7 +19,6 @@ #include #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h index 40f64253a2f..14a226871c9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileReader.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileReader.h @@ -16,7 +16,6 @@ #include #include -#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h index 28a58b27301..16bc5a5d73f 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkerPrepareStreams.h @@ -37,7 +37,7 @@ class RNWorkerPrepareStreams protected: SegmentReadTaskPtr doWork(const SegmentReadTaskPtr & task) override { - const auto & settings = task->dm_context->db_context.getSettingsRef(); + const auto & settings = task->dm_context->global_context.getSettingsRef(); task->initInputStream( *columns_to_read, read_tso, diff --git a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp index 73143aac892..ff2f0b346c2 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/Serializer.cpp @@ -127,7 +127,7 @@ SegmentSnapshotPtr Serializer::deserializeSegmentSnapshotFrom( segment_range = RowKeyRange::deserialize(rb); } - auto data_store = dm_context.db_context.getSharedContextDisagg()->remote_data_store; + auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; auto delta_snap = std::make_shared(CurrentMetrics::DT_SnapshotOfDisaggReadNodeRead); delta_snap->is_update = false; @@ -138,7 +138,7 @@ SegmentSnapshotPtr Serializer::deserializeSegmentSnapshotFrom( // Note: At this moment, we still cannot read from `delta_snap->mem_table_snap` and `delta_snap->persisted_files_snap`, // because they are constructed using ColumnFileDataProviderNop. - auto delta_index_cache = dm_context.db_context.getSharedContextDisagg()->rn_delta_index_cache; + auto delta_index_cache = dm_context.global_context.getSharedContextDisagg()->rn_delta_index_cache; if (delta_index_cache) { delta_snap->shared_delta_index = delta_index_cache->getDeltaIndex({ diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index 5ebcc502d43..d295849a7ad 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -131,9 +131,9 @@ DMFilePtr writeIntoNewDMFile( file_id, parent_path, dm_context.createChecksumConfig(), - dm_context.db_context.getSettingsRef().dt_small_file_size_threshold, - dm_context.db_context.getSettingsRef().dt_merged_file_max_size); - auto output_stream = std::make_shared(dm_context.db_context, dmfile, *schema_snap); + dm_context.global_context.getSettingsRef().dt_small_file_size_threshold, + dm_context.global_context.getSettingsRef().dt_merged_file_max_size); + auto output_stream = std::make_shared(dm_context.global_context, dmfile, *schema_snap); const auto * mvcc_stream = typeid_cast *>(input_stream.get()); @@ -186,36 +186,36 @@ DMFilePtr writeIntoNewDMFile( } StableValueSpacePtr createNewStable( // - DMContext & context, + DMContext & dm_context, const ColumnDefinesPtr & schema_snap, const BlockInputStreamPtr & input_stream, PageIdU64 stable_id, WriteBatches & wbs) { - auto delegator = context.path_pool->getStableDiskDelegator(); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); auto store_path = delegator.choosePath(); - PageIdU64 dtfile_id = context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + PageIdU64 dtfile_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); DMFilePtr dtfile; try { - dtfile = writeIntoNewDMFile(context, schema_snap, input_stream, dtfile_id, store_path); + dtfile = writeIntoNewDMFile(dm_context, schema_snap, input_stream, dtfile_id, store_path); auto stable = std::make_shared(stable_id); - stable->setFiles({dtfile}, RowKeyRange::newAll(context.is_common_handle, context.rowkey_column_size)); + stable->setFiles({dtfile}, RowKeyRange::newAll(dm_context.is_common_handle, dm_context.rowkey_column_size)); stable->saveMeta(wbs.meta); - if (auto data_store = context.db_context.getSharedContextDisagg()->remote_data_store; !data_store) + if (auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; !data_store) { wbs.data.putExternal(dtfile_id, 0); delegator.addDTFile(dtfile_id, dtfile->getBytesOnDisk(), store_path); } else { - auto store_id = context.db_context.getTMTContext().getKVStore()->getStoreID(); + auto store_id = dm_context.global_context.getTMTContext().getKVStore()->getStoreID(); Remote::DMFileOID oid{ .store_id = store_id, - .keyspace_id = context.keyspace_id, - .table_id = context.physical_table_id, + .keyspace_id = dm_context.keyspace_id, + .table_id = dm_context.physical_table_id, .file_id = dtfile_id, }; data_store->putDMFile(dtfile, oid, /*switch_to_remote*/ true); @@ -233,7 +233,7 @@ StableValueSpacePtr createNewStable( // { if (dtfile) { - dtfile->remove(context.db_context.getFileProvider()); + dtfile->remove(dm_context.global_context.getFileProvider()); } throw; } @@ -394,7 +394,7 @@ Segment::SegmentMetaInfos Segment::readAllSegmentsMetaInfoInRange( // const RowKeyRange & target_range, const CheckpointInfoPtr & checkpoint_info) { - auto fap_context = context.db_context.getSharedContextDisagg()->fap_context; + auto fap_context = context.global_context.getSharedContextDisagg()->fap_context; // If cache is empty, we read from DELTA_MERGE_FIRST_SEGMENT_ID to the end and build the cache. // Otherwise, we just read the segment that cover the range. @@ -604,7 +604,7 @@ bool Segment::isDefinitelyEmpty(DMContext & dm_context, const SegmentSnapshotPtr SkippableBlockInputStreams streams; for (const auto & file : segment_snap->stable->getDMFiles()) { - DMFileBlockInputStreamBuilder builder(dm_context.db_context); + DMFileBlockInputStreamBuilder builder(dm_context.global_context); auto stream = builder .setRowsThreshold( std::numeric_limits::max()) // TODO: May be we could have some better settings @@ -764,8 +764,11 @@ BlockInputStreamPtr Segment::getInputStream( UInt64 max_version, size_t expected_block_size) { - auto clipped_block_rows - = clipBlockRows(dm_context.db_context, expected_block_size, columns_to_read, segment_snap->stable->stable); + auto clipped_block_rows = clipBlockRows( // + dm_context.global_context, + expected_block_size, + columns_to_read, + segment_snap->stable->stable); switch (read_mode) { case ReadMode::Normal: @@ -786,7 +789,12 @@ BlockInputStreamPtr Segment::getInputStream( filter ? filter->rs_operator : EMPTY_RS_OPERATOR, clipped_block_rows); case ReadMode::Raw: - return getInputStreamModeRaw(dm_context, columns_to_read, segment_snap, read_ranges, clipped_block_rows); + return getInputStreamModeRaw( // + dm_context, + columns_to_read, + segment_snap, + read_ranges, + clipped_block_rows); case ReadMode::Bitmap: return getBitmapFilterInputStream( dm_context, @@ -1289,7 +1297,7 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( // Always create a ref to the file to allow `data_file` being shared. auto new_page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); auto ref_file = DMFile::restore( - dm_context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), data_file->fileId(), new_page_id, data_file->parentPath(), @@ -1319,7 +1327,7 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( auto new_data_page_id = storage_pool->newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); auto old_data_page_id = b->getDataPageId(); wbs.data.putRefPage(new_data_page_id, old_data_page_id); - auto wn_ps = dm_context.db_context.getWriteNodePageStorage(); + auto wn_ps = dm_context.global_context.getWriteNodePageStorage(); auto full_page_id = UniversalPageIdFormat::toFullPageId( UniversalPageIdFormat::toFullPrefix( dm_context.keyspace_id, @@ -1330,7 +1338,7 @@ SegmentPtr Segment::dangerouslyReplaceDataFromCheckpoint( auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile(); auto file_oid = data_key_view.getDMFileOID(); RUNTIME_CHECK(file_oid.file_id == b->getFile()->fileId(), file_oid.file_id, b->getFile()->fileId()); - auto remote_data_store = dm_context.db_context.getSharedContextDisagg()->remote_data_store; + auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; RUNTIME_CHECK(remote_data_store != nullptr); auto prepared = remote_data_store->prepareDMFile(file_oid, new_data_page_id); auto dmfile = prepared->restore(DMFile::ReadMetaMode::all()); @@ -1445,7 +1453,7 @@ std::optional Segment::getSplitPointFast(DMContext & dm_context, co if (unlikely(!read_file)) throw Exception("Logical error: failed to find split point"); - DMFileBlockInputStreamBuilder builder(dm_context.db_context); + DMFileBlockInputStreamBuilder builder(dm_context.global_context); auto stream = builder.setColumnCache(stable_snap->getColumnCaches()[file_index]) .setReadPacks(read_pack) .setTracingID(fmt::format("{}-getSplitPointFast", dm_context.tracing_id)) @@ -1710,7 +1718,7 @@ Segment::prepareSplitLogical( // auto ori_page_id = dmfile->pageId(); auto file_id = dmfile->fileId(); auto file_parent_path = dmfile->parentPath(); - if (!dm_context.db_context.getSharedContextDisagg()->remote_data_store) + if (!dm_context.global_context.getSharedContextDisagg()->remote_data_store) { RUNTIME_CHECK(file_parent_path == delegate.getDTFilePath(file_id)); } @@ -1726,13 +1734,13 @@ Segment::prepareSplitLogical( // wbs.removed_data.delPage(ori_page_id); auto my_dmfile = DMFile::restore( - dm_context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), file_id, /* page_id= */ my_dmfile_page_id, file_parent_path, DMFile::ReadMetaMode::all()); auto other_dmfile = DMFile::restore( - dm_context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), file_id, /* page_id= */ other_dmfile_page_id, file_parent_path, @@ -2344,13 +2352,13 @@ Segment::ReadInfo Segment::getReadInfo( { LOG_DEBUG(segment_snap->log, "Segment updated delta index"); // Update cache size. - if (auto cache = dm_context.db_context.getSharedContextDisagg()->rn_delta_index_cache; cache) + if (auto cache = dm_context.global_context.getSharedContextDisagg()->rn_delta_index_cache; cache) cache->setDeltaIndex(segment_snap->delta->getSharedDeltaIndex()); } } // Refresh the reference in DeltaIndexManager, so that the index can be properly managed. - if (auto manager = dm_context.db_context.getDeltaIndexManager(); manager) + if (auto manager = dm_context.global_context.getDeltaIndexManager(); manager) manager->refreshRef(segment_snap->delta->getSharedDeltaIndex()); return ReadInfo( @@ -2796,13 +2804,13 @@ std::pair, std::vector> parseDMFilePackInfo( { DMFilePackFilter pack_filter = DMFilePackFilter::loadFrom( dmfile, - dm_context.db_context.getMinMaxIndexCache(), + dm_context.global_context.getMinMaxIndexCache(), /*set_cache_if_miss*/ true, read_ranges, filter, /*read_pack*/ {}, - dm_context.db_context.getFileProvider(), - dm_context.db_context.getReadLimiter(), + dm_context.global_context.getFileProvider(), + dm_context.global_context.getReadLimiter(), dm_context.scan_context, dm_context.tracing_id); const auto & use_packs = pack_filter.getUsePacksConst(); diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp index 63b96ba3610..ed3d711b713 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.cpp @@ -46,7 +46,7 @@ SegmentReadTask::SegmentReadTask( const SegmentSnapshotPtr & read_snapshot_, const DMContextPtr & dm_context_, const RowKeyRanges & ranges_) - : store_id(dm_context_->db_context.getTMTContext().getKVStore()->getStoreID()) + : store_id(dm_context_->global_context.getTMTContext().getKVStore()->getStoreID()) , segment(segment_) , read_snapshot(read_snapshot_) , dm_context(dm_context_) @@ -78,7 +78,7 @@ SegmentReadTask::SegmentReadTask( auto rb = ReadBufferFromString(proto.key_range()); auto segment_range = RowKeyRange::deserialize(rb); - dm_context = std::make_shared( + dm_context = DMContext::create( db_context, /* path_pool */ nullptr, /* storage_pool */ nullptr, @@ -215,7 +215,7 @@ void SegmentReadTask::initColumnFileDataProvider(const Remote::RNLocalPageCacheG RUNTIME_CHECK(std::dynamic_pointer_cast(data_provider)); RUNTIME_CHECK(extra_remote_info.has_value()); - auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + auto page_cache = dm_context->global_context.getSharedContextDisagg()->rn_page_cache; data_provider = std::make_shared( page_cache, pages_guard, @@ -246,7 +246,7 @@ void SegmentReadTask::initInputStream( // Exception DT_DELTA_INDEX_ERROR raised. Reset delta index and try again. DeltaIndex empty_delta_index; read_snapshot->delta->getSharedDeltaIndex()->swap(empty_delta_index); - if (auto cache = dm_context->db_context.getSharedContextDisagg()->rn_delta_index_cache; cache) + if (auto cache = dm_context->global_context.getSharedContextDisagg()->rn_delta_index_cache; cache) { cache->setDeltaIndex(read_snapshot->delta->getSharedDeltaIndex()); } @@ -402,7 +402,7 @@ Remote::RNLocalPageCache::OccupySpaceResult SegmentReadTask::blockingOccupySpace GET_METRIC(tiflash_disaggregated_breakdown_duration_seconds, type_cache_occupy) .Observe(w_occupy.elapsedSeconds()); }); - auto page_cache = dm_context->db_context.getSharedContextDisagg()->rn_page_cache; + auto page_cache = dm_context->global_context.getSharedContextDisagg()->rn_page_cache; auto scan_context = dm_context->scan_context; return page_cache->occupySpace(cf_tiny_oids, extra_remote_info->remote_page_sizes, scan_context); } @@ -455,7 +455,7 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest UInt64 wait_write_page_ns = 0; Stopwatch sw_total; - const auto * cluster = dm_context->db_context.getTMTContext().getKVCluster(); + const auto * cluster = dm_context->global_context.getTMTContext().getKVCluster(); pingcap::kv::RpcCall rpc( cluster->rpc_client, extra_remote_info->store_address); @@ -511,7 +511,7 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest if (write_page_task == nullptr) { write_page_task = std::make_unique( - dm_context->db_context.getSharedContextDisagg()->rn_page_cache.get()); + dm_context->global_context.getSharedContextDisagg()->rn_page_cache.get()); } auto & remote_page = write_page_task->remote_pages.emplace_front(); // NOLINT(bugprone-use-after-move) bool parsed = remote_page.ParseFromString(page); @@ -547,7 +547,7 @@ void SegmentReadTask::doFetchPages(const disaggregated::FetchDisaggPagesRequest auto page_id = Remote::RNLocalPageCache::buildCacheId(oid); write_page_task->wb .putPage(page_id, 0, std::move(read_buffer), remote_page.data().size(), std::move(field_sizes)); - auto write_batch_limit_size = dm_context->db_context.getSettingsRef().dt_write_page_cache_limit_size; + auto write_batch_limit_size = dm_context->global_context.getSettingsRef().dt_write_page_cache_limit_size; if (write_page_task->wb.getTotalDataSize() >= write_batch_limit_size) { write_page_results.push_back( diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h index ed407e462d8..f971a5a3fc9 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTask.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTask.h @@ -14,6 +14,7 @@ #pragma once +#include #include #include #include diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp index 57c70cf0161..e1935648c2b 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.cpp @@ -95,7 +95,7 @@ BlockInputStreamPtr SegmentReadTaskPool::buildInputStream(SegmentReadTaskPtr & t filter, read_mode, expected_block_size, - t->dm_context->db_context.getSettingsRef().dt_enable_delta_index_error_fallback); + t->dm_context->global_context.getSettingsRef().dt_enable_delta_index_error_fallback); BlockInputStreamPtr stream = std::make_shared( t->getInputStream(), extra_table_id_index, diff --git a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h index 30b1a5fe628..e3024ec37ff 100644 --- a/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h +++ b/dbms/src/Storages/DeltaMerge/SegmentReadTaskPool.h @@ -14,7 +14,7 @@ #pragma once #include -#include +#include #include #include #include @@ -248,4 +248,4 @@ class SegmentReadTaskPool : private boost::noncopyable using SegmentReadTaskPoolPtr = std::shared_ptr; using SegmentReadTaskPools = std::vector; -} // namespace DB::DM \ No newline at end of file +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp index ebe6d697ec9..2f7321af09d 100644 --- a/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp +++ b/dbms/src/Storages/DeltaMerge/StableValueSpace.cpp @@ -53,7 +53,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang } else if (dm_context != nullptr) { - auto index_cache = dm_context->db_context.getGlobalContext().getMinMaxIndexCache(); + auto index_cache = dm_context->global_context.getGlobalContext().getMinMaxIndexCache(); for (const auto & file : files_) { auto pack_filter = DMFilePackFilter::loadFrom( @@ -63,7 +63,7 @@ void StableValueSpace::setFiles(const DMFiles & files_, const RowKeyRange & rang {range}, EMPTY_RS_OPERATOR, {}, - dm_context->db_context.getFileProvider(), + dm_context->global_context.getFileProvider(), dm_context->getReadLimiter(), dm_context->scan_context, dm_context->tracing_id); @@ -92,11 +92,11 @@ void StableValueSpace::saveMeta(WriteBatchWrapper & meta_wb) meta_wb.putPage(id, 0, buf.tryGetReadBuffer(), data_size); } -StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id) +StableValueSpacePtr StableValueSpace::restore(DMContext & dm_context, PageIdU64 id) { auto stable = std::make_shared(id); - Page page = context.storage_pool->metaReader()->read(id); // not limit restore + Page page = dm_context.storage_pool->metaReader()->read(id); // not limit restore ReadBufferFromMemory buf(page.data.begin(), page.data.size()); UInt64 version, valid_rows, valid_bytes, size; readIntBinary(version, buf); @@ -107,26 +107,29 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id) readIntBinary(valid_bytes, buf); readIntBinary(size, buf); UInt64 page_id; - auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; + auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; for (size_t i = 0; i < size; ++i) { readIntBinary(page_id, buf); DMFilePtr dmfile; - auto path_delegate = context.path_pool->getStableDiskDelegator(); + auto path_delegate = dm_context.path_pool->getStableDiskDelegator(); if (remote_data_store) { - auto wn_ps = context.db_context.getWriteNodePageStorage(); + auto wn_ps = dm_context.global_context.getWriteNodePageStorage(); auto full_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id), + UniversalPageIdFormat::toFullPrefix( + dm_context.keyspace_id, + StorageType::Data, + dm_context.physical_table_id), page_id); auto full_external_id = wn_ps->getNormalPageId(full_page_id); auto local_external_id = UniversalPageIdFormat::getU64ID(full_external_id); auto remote_data_location = wn_ps->getCheckpointLocation(full_page_id); const auto & lock_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)); auto file_oid = lock_key_view.asDataFile().getDMFileOID(); - RUNTIME_CHECK(file_oid.keyspace_id == context.keyspace_id); - RUNTIME_CHECK(file_oid.table_id == context.physical_table_id); + RUNTIME_CHECK(file_oid.keyspace_id == dm_context.keyspace_id); + RUNTIME_CHECK(file_oid.table_id == dm_context.physical_table_id); auto prepared = remote_data_store->prepareDMFile(file_oid, page_id); dmfile = prepared->restore(DMFile::ReadMetaMode::all()); // gc only begin to run after restore so we can safely call addRemoteDTFileIfNotExists here @@ -134,10 +137,10 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id) } else { - auto file_id = context.storage_pool->dataReader()->getNormalPageId(page_id); + auto file_id = dm_context.storage_pool->dataReader()->getNormalPageId(page_id); auto file_parent_path = path_delegate.getDTFilePath(file_id); dmfile = DMFile::restore( - context.db_context.getFileProvider(), + dm_context.global_context.getFileProvider(), file_id, page_id, file_parent_path, @@ -155,7 +158,7 @@ StableValueSpacePtr StableValueSpace::restore(DMContext & context, PageIdU64 id) } StableValueSpacePtr StableValueSpace::createFromCheckpoint( // - DMContext & context, + DMContext & dm_context, UniversalPageStoragePtr temp_ps, PageIdU64 stable_id, WriteBatches & wbs) @@ -163,7 +166,7 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // auto stable = std::make_shared(stable_id); auto stable_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Meta, context.physical_table_id), + UniversalPageIdFormat::toFullPrefix(dm_context.keyspace_id, StorageType::Meta, dm_context.physical_table_id), stable_id); auto page = temp_ps->read(stable_page_id); ReadBufferFromMemory buf(page.data.begin(), page.data.size()); @@ -180,20 +183,23 @@ StableValueSpacePtr StableValueSpace::createFromCheckpoint( // readIntBinary(size, buf); } - auto remote_data_store = context.db_context.getSharedContextDisagg()->remote_data_store; + auto remote_data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; for (size_t i = 0; i < size; ++i) { UInt64 page_id; readIntBinary(page_id, buf); auto full_page_id = UniversalPageIdFormat::toFullPageId( - UniversalPageIdFormat::toFullPrefix(context.keyspace_id, StorageType::Data, context.physical_table_id), + UniversalPageIdFormat::toFullPrefix( + dm_context.keyspace_id, + StorageType::Data, + dm_context.physical_table_id), page_id); auto remote_data_location = temp_ps->getCheckpointLocation(full_page_id); auto data_key_view = S3::S3FilenameView::fromKey(*(remote_data_location->data_file_id)).asDataFile(); auto file_oid = data_key_view.getDMFileOID(); auto data_key = data_key_view.toFullKey(); - auto delegator = context.path_pool->getStableDiskDelegator(); - auto new_local_page_id = context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); + auto new_local_page_id = dm_context.storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); PS::V3::CheckpointLocation loc{ .data_file_id = std::make_shared(data_key), .offset_in_file = 0, @@ -266,16 +272,16 @@ String StableValueSpace::getDMFilesString() return s; } -void StableValueSpace::enableDMFilesGC(DMContext & context) +void StableValueSpace::enableDMFilesGC(DMContext & dm_context) { - if (auto data_store = context.db_context.getSharedContextDisagg()->remote_data_store; !data_store) + if (auto data_store = dm_context.global_context.getSharedContextDisagg()->remote_data_store; !data_store) { for (auto & file : files) file->enableGC(); } else { - auto delegator = context.path_pool->getStableDiskDelegator(); + auto delegator = dm_context.path_pool->getStableDiskDelegator(); for (auto & file : files) delegator.enableGCForRemoteDTFile(file->fileId()); } @@ -327,7 +333,7 @@ void StableValueSpace::calculateStableProperty( // // If we pass `segment_range` instead, // then the returned stream is a `SkippableBlockInputStream` which will complicate the implementation - DMFileBlockInputStreamBuilder builder(context.db_context); + DMFileBlockInputStreamBuilder builder(context.global_context); BlockInputStreamPtr data_stream = builder .setRowsThreshold(std::numeric_limits::max()) // because we just read one pack at a time @@ -361,12 +367,12 @@ void StableValueSpace::calculateStableProperty( } auto pack_filter = DMFilePackFilter::loadFrom( file, - context.db_context.getGlobalContext().getMinMaxIndexCache(), + context.global_context.getMinMaxIndexCache(), /*set_cache_if_miss*/ false, {rowkey_range}, EMPTY_RS_OPERATOR, {}, - context.db_context.getFileProvider(), + context.global_context.getFileProvider(), context.getReadLimiter(), context.scan_context, context.tracing_id); @@ -464,7 +470,7 @@ SkippableBlockInputStreamPtr StableValueSpace::Snapshot::getInputStream( rows.reserve(stable->files.size()); for (size_t i = 0; i < stable->files.size(); i++) { - DMFileBlockInputStreamBuilder builder(context.db_context); + DMFileBlockInputStreamBuilder builder(context.global_context); builder.enableCleanRead(enable_handle_clean_read, is_fast_scan, enable_del_clean_read, max_data_version) .setRSOperator(filter) .setColumnCache(column_caches[i]) @@ -507,12 +513,12 @@ RowsAndBytes StableValueSpace::Snapshot::getApproxRowsAndBytes(const DMContext & { auto filter = DMFilePackFilter::loadFrom( f, - context.db_context.getGlobalContext().getMinMaxIndexCache(), + context.global_context.getMinMaxIndexCache(), /*set_cache_if_miss*/ false, {range}, RSOperatorPtr{}, IdSetPtr{}, - context.db_context.getFileProvider(), + context.global_context.getFileProvider(), context.getReadLimiter(), context.scan_context, context.tracing_id); @@ -552,12 +558,12 @@ StableValueSpace::Snapshot::getAtLeastRowsAndBytes(const DMContext & context, co const auto & file = stable->files[file_idx]; auto filter = DMFilePackFilter::loadFrom( file, - context.db_context.getGlobalContext().getMinMaxIndexCache(), + context.global_context.getMinMaxIndexCache(), /*set_cache_if_miss*/ false, {range}, RSOperatorPtr{}, IdSetPtr{}, - context.db_context.getFileProvider(), + context.global_context.getFileProvider(), context.getReadLimiter(), context.scan_context, context.tracing_id); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp index c42ffcc2c03..1b16de9176c 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_column_file.cpp @@ -59,7 +59,7 @@ class ColumnFileTest : public DB::base::TiFlashStorageTestBasic = std::make_shared(db_context->getPathPool().withTable("test", "DMFile_Test", false)); storage_pool = std::make_shared(*db_context, NullspaceID, /*ns_id*/ 100, *path_pool, "test.t1"); column_cache = std::make_shared(); - dm_context = std::make_unique( // + dm_context = DMContext::createUnique( *db_context, path_pool, storage_pool, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp index 7512a01082f..aff4cef9bc2 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_value_space.cpp @@ -103,7 +103,7 @@ class DeltaValueSpaceTest : public DB::base::TiFlashStorageTestBasic { *table_columns = *columns; - dm_context = std::make_unique( + dm_context = DMContext::createUnique( *db_context, storage_path_pool, storage_pool, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp index 1f820429a40..e3517096815 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_file.cpp @@ -129,7 +129,7 @@ class DMFileTest if (table_columns != cols) *table_columns = *cols; *path_pool = db_context->getPathPool().withTable("test", "t1", false); - dm_context = std::make_unique( // + dm_context = DMContext::createUnique( *db_context, path_pool, storage_pool, @@ -1460,7 +1460,7 @@ class DMFileClusteredIndexTest *table_columns = *cols; - dm_context = std::make_unique( // + dm_context = DMContext::createUnique( *db_context, path_pool, storage_pool, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 3b1ffe340e6..f57d0fc4f01 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -100,7 +100,7 @@ class SegmentTest : public DB::base::TiFlashStorageTestBasic { *table_columns = *columns; - dm_context = std::make_unique( + dm_context = DMContext::createUnique( *db_context, storage_path_pool, storage_pool, @@ -930,7 +930,7 @@ try ASSERT_EQ(c1->size(), c2->size()); - for (Int64 i = 0; i < Int64(c1->size()); i++) + for (Int64 i = 0; i < static_cast(c1->size()); i++) { if (iter1->name == DMTestEnv::pk_name) { @@ -986,7 +986,7 @@ CATCH TEST_F(SegmentTest, MassiveSplit) try { - Settings settings = dmContext().db_context.getSettings(); + Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_limit_rows = 11; settings.dt_segment_delta_limit_rows = 7; @@ -1015,8 +1015,8 @@ try // if pk % 5 < 2, then the record would be deleted // if pk % 5 >= 2, then the record would be reserved HandleRange del{ - Int64((num_batches_written - 1) * num_rows_per_write), - Int64((num_batches_written - 1) * num_rows_per_write + 2)}; + static_cast((num_batches_written - 1) * num_rows_per_write), + static_cast((num_batches_written - 1) * num_rows_per_write + 2)}; segment->write(dmContext(), {RowKeyRange::fromHandleRange(del)}); } @@ -1029,7 +1029,7 @@ try i < num_batches_written * num_rows_per_write; i++) { - temp.push_back(Int64(i)); + temp.push_back(static_cast(i)); } { @@ -1157,7 +1157,7 @@ try case SegmentTestMode::V3_FileOnly: { auto delegate = dmContext().path_pool->getStableDiskDelegator(); - auto file_provider = dmContext().db_context.getFileProvider(); + auto file_provider = dmContext().global_context.getFileProvider(); auto [range, file_ids] = genDMFile(dmContext(), block); auto file_id = file_ids[0]; auto file_parent_path = delegate.getDTFilePath(file_id); @@ -1548,7 +1548,7 @@ CATCH TEST_F(SegmentTest, CalculateDTFileProperty) try { - Settings settings = dmContext().db_context.getSettings(); + Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_stable_pack_rows = 10; segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); @@ -1589,7 +1589,7 @@ CATCH TEST_F(SegmentTest, CalculateDTFilePropertyWithPropertyFileDeleted) try { - Settings settings = dmContext().db_context.getSettings(); + Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_stable_pack_rows = 10; segment = reload(DMTestEnv::getDefaultColumns(), std::move(settings)); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp index a3cbd382104..1964f6854dd 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_common_handle.cpp @@ -77,7 +77,7 @@ class SegmentCommonHandleTest : public DB::base::TiFlashStorageTestBasic { *table_columns_ = *columns; - dm_context_ = std::make_unique( + dm_context_ = DMContext::createUnique( *db_context, path_pool, storage_pool, @@ -925,7 +925,7 @@ CATCH TEST_F(SegmentCommonHandleTest, MassiveSplit) try { - Settings settings = dmContext().db_context.getSettings(); + Settings settings = dmContext().global_context.getSettings(); settings.dt_segment_limit_rows = 11; settings.dt_segment_delta_limit_rows = 7; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp index 6a55cb0a076..54d6a54a56d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment_s3.cpp @@ -142,7 +142,7 @@ class SegmentTestS3 : public DB::base::TiFlashStorageTestBasic { *table_columns = *columns; - dm_context = std::make_unique( + dm_context = DMContext::createUnique( *db_context, storage_path_pool, storage_pool, @@ -210,7 +210,7 @@ try std::vector ordered_segments = {left, right}; segment = Segment::merge(dmContext(), tableColumns(), ordered_segments); - auto wn_ps = dmContext().db_context.getWriteNodePageStorage(); + auto wn_ps = dmContext().global_context.getWriteNodePageStorage(); wn_ps->gc(/*not_skip*/ true); { auto valid_external_ids = wn_ps->page_directory->getAliveExternalIds( diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp index 97d088df1d1..2e9d3aeeb7a 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_read_task_pool.cpp @@ -49,7 +49,7 @@ class SegmentReadTasksWrapperTest : public SegmentTestBasic { auto dm_context = createDMContext(); return GlobalSegmentID{ - .store_id = dm_context->db_context.getTMTContext().getKVStore()->getStoreID(), + .store_id = dm_context->global_context.getTMTContext().getKVStore()->getStoreID(), .keyspace_id = dm_context->keyspace_id, .physical_table_id = dm_context->physical_table_id, .segment_id = seg_id, @@ -121,4 +121,4 @@ TEST_F(SegmentReadTasksWrapperTest, Ordered) ASSERT_EQ(tasks_wrapper.nextTask(), nullptr); } -} // namespace DB::DM::tests \ No newline at end of file +} // namespace DB::DM::tests diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 9fc8f8e0c0f..cade5b28589 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -522,7 +522,7 @@ void SegmentTestBasic::ingestDTFileIntoDelta( auto ref_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); wbs.data.putRefPage(ref_id, dm_file->pageId()); auto ref_file = DMFile::restore( - dm_context->db_context.getFileProvider(), + dm_context->global_context.getFileProvider(), file_id, ref_id, parent_path, @@ -581,7 +581,7 @@ void SegmentTestBasic::ingestDTFileByReplace( auto ref_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); wbs.data.putRefPage(ref_id, dm_file->pageId()); auto ref_file = DMFile::restore( - dm_context->db_context.getFileProvider(), + dm_context->global_context.getFileProvider(), file_id, ref_id, parent_path, @@ -848,7 +848,7 @@ void SegmentTestBasic::reloadDMContext() std::unique_ptr SegmentTestBasic::createDMContext() { - return std::make_unique( + return DMContext::createUnique( *db_context, storage_path_pool, storage_pool,