Skip to content

Commit

Permalink
Merge pull request #59322 from ClickHouse/backport/23.12/58877
Browse files Browse the repository at this point in the history
Backport #58877 to 23.12: Multiple read file log storage in mv
  • Loading branch information
robot-ch-test-poll3 authored Jan 29, 2024
2 parents 5256766 + 4eda606 commit 93909fc
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 86 deletions.
43 changes: 43 additions & 0 deletions src/Processors/QueryPlan/ReadFromStreamLikeEngine.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>

#include <Interpreters/InterpreterSelectQuery.h>
#include <QueryPipeline/QueryPipelineBuilder.h>

namespace DB
{

namespace ErrorCodes
{
extern const int QUERY_NOT_ALLOWED;
}

ReadFromStreamLikeEngine::ReadFromStreamLikeEngine(
const Names & column_names_,
const StorageSnapshotPtr & storage_snapshot_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
ContextPtr context_)
: ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}}
, WithContext{context_}
, storage_limits{std::move(storage_limits_)}
{
}

void ReadFromStreamLikeEngine::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
if (!getContext()->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED, "Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");

auto pipe = makePipe();

/// Add storage limits.
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(storage_limits);

/// Add to processors to get processor info through explain pipeline statement.
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

pipeline.init(std::move(pipe));
}
}
25 changes: 25 additions & 0 deletions src/Processors/QueryPlan/ReadFromStreamLikeEngine.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <Processors/QueryPlan/ISourceStep.h>
#include <Storages/IStorage.h>
#include <Storages/StorageSnapshot.h>

namespace DB
{
class ReadFromStreamLikeEngine : public ISourceStep, protected WithContext
{
public:
ReadFromStreamLikeEngine(
const Names & column_names_,
const StorageSnapshotPtr & storage_snapshot_,
std::shared_ptr<const StorageLimitsList> storage_limits_,
ContextPtr context_);

void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & /*settings*/) final;

protected:
virtual Pipe makePipe() = 0;

std::shared_ptr<const StorageLimitsList> storage_limits;
};
}
132 changes: 81 additions & 51 deletions src/Storages/FileLog/StorageFileLog.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Disks/StoragePolicy.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
Expand All @@ -13,9 +13,12 @@
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <QueryPipeline/Pipe.h>
#include <Storages/FileLog/FileLogSource.h>
#include <Storages/FileLog/StorageFileLog.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageMaterializedView.h>
#include <Storages/checkAndGetLiteralArgument.h>
Expand Down Expand Up @@ -50,6 +53,76 @@ namespace

static constexpr auto TMP_SUFFIX = ".tmp";


class ReadFromStorageFileLog final : public ReadFromStreamLikeEngine
{
public:
ReadFromStorageFileLog(
const Names & column_names_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & query_info,
ContextPtr context_)
: ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_}
, column_names{column_names_}
, storage{storage_}
, storage_snapshot{storage_snapshot_}
{
}

String getName() const override { return "ReadFromStorageFileLog"; }

private:
Pipe makePipe() final
{
auto & file_log = storage->as<StorageFileLog &>();
if (file_log.mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views");

std::lock_guard lock(file_log.file_infos_mutex);
if (file_log.running_streams)
throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish.");

file_log.updateFileInfos();

/// No files to parse
if (file_log.file_infos.file_names.empty())
{
LOG_WARNING(file_log.log, "There is a idle table named {}, no files need to parse.", getName());
return Pipe{};
}

auto modified_context = Context::createCopy(getContext());

auto max_streams_number = std::min<UInt64>(file_log.filelog_settings->max_threads, file_log.file_infos.file_names.size());

/// Each stream responsible for closing it's files and store meta
file_log.openFilesAndSetPos();

Pipes pipes;
pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
file_log,
storage_snapshot,
modified_context,
column_names,
file_log.getMaxBlockSize(),
file_log.getPollTimeoutMillisecond(),
stream_number,
max_streams_number,
file_log.filelog_settings->handle_error_mode));
}

return Pipe::unitePipes(std::move(pipes));
}

const Names column_names;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
};

StorageFileLog::StorageFileLog(
const StorageID & table_id_,
ContextPtr context_,
Expand Down Expand Up @@ -296,62 +369,19 @@ UInt64 StorageFileLog::getInode(const String & file_name)
return file_stat.st_ino;
}

Pipe StorageFileLog::read(
void StorageFileLog::read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & /* query_info */,
ContextPtr local_context,
SelectQueryInfo & query_info,
ContextPtr query_context,
QueryProcessingStage::Enum /* processed_stage */,
size_t /* max_block_size */,
size_t /* num_streams */)
{
/// If there are MVs depended on this table, we just forbid reading
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");

if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageFileLog with attached materialized views");

std::lock_guard lock(file_infos_mutex);
if (running_streams)
{
throw Exception(ErrorCodes::CANNOT_SELECT, "Another select query is running on this table, need to wait it finish.");
}

updateFileInfos();

/// No files to parse
if (file_infos.file_names.empty())
{
LOG_WARNING(log, "There is a idle table named {}, no files need to parse.", getName());
return Pipe{};
}

auto modified_context = Context::createCopy(local_context);

auto max_streams_number = std::min<UInt64>(filelog_settings->max_threads, file_infos.file_names.size());

/// Each stream responsible for closing it's files and store meta
openFilesAndSetPos();

Pipes pipes;
pipes.reserve(max_streams_number);
for (size_t stream_number = 0; stream_number < max_streams_number; ++stream_number)
{
pipes.emplace_back(std::make_shared<FileLogSource>(
*this,
storage_snapshot,
modified_context,
column_names,
getMaxBlockSize(),
getPollTimeoutMillisecond(),
stream_number,
max_streams_number,
filelog_settings->handle_error_mode));
}

return Pipe::unitePipes(std::move(pipes));
{
query_plan.addStep(
std::make_unique<ReadFromStorageFileLog>(column_names, shared_from_this(), storage_snapshot, query_info, std::move(query_context)));
}

void StorageFileLog::increaseStreams()
Expand Down
5 changes: 4 additions & 1 deletion src/Storages/FileLog/StorageFileLog.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ class StorageFileLog final : public IStorage, WithContext
void startup() override;
void shutdown(bool is_drop) override;

Pipe read(
void read(
QueryPlan & query_plan,
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
Expand Down Expand Up @@ -133,6 +134,8 @@ class StorageFileLog final : public IStorage, WithContext
const auto & getFileLogSettings() const { return filelog_settings; }

private:
friend class ReadFromStorageFileLog;

std::unique_ptr<FileLogSettings> filelog_settings;

const String path;
Expand Down
40 changes: 8 additions & 32 deletions src/Storages/Kafka/StorageKafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include <Common/Stopwatch.h>
#include <Common/formatReadable.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Processors/QueryPlan/ReadFromStreamLikeEngine.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <Common/setThreadName.h>
Expand Down Expand Up @@ -175,62 +176,40 @@ struct StorageKafkaInterceptors
}
};

class ReadFromStorageKafkaStep final : public ISourceStep
class ReadFromStorageKafka final : public ReadFromStreamLikeEngine
{
public:
ReadFromStorageKafkaStep(
ReadFromStorageKafka(
const Names & column_names_,
StoragePtr storage_,
const StorageSnapshotPtr & storage_snapshot_,
SelectQueryInfo & query_info,
ContextPtr context_)
: ISourceStep{DataStream{.header = storage_snapshot_->getSampleBlockForColumns(column_names_)}}
: ReadFromStreamLikeEngine{column_names_, storage_snapshot_, query_info.storage_limits, context_}
, column_names{column_names_}
, storage{storage_}
, storage_snapshot{storage_snapshot_}
, storage_limits{query_info.storage_limits}
, context{context_}
{
}

String getName() const override { return "ReadFromStorageKafka"; }

void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override
{
auto pipe = makePipe();

/// Add storage limits.
for (const auto & processor : pipe.getProcessors())
processor->setStorageLimits(storage_limits);

/// Add to processors to get processor info through explain pipeline statement.
for (const auto & processor : pipe.getProcessors())
processors.emplace_back(processor);

pipeline.init(std::move(pipe));
}

private:
Pipe makePipe()
Pipe makePipe() final
{
auto & kafka_storage = storage->as<StorageKafka &>();
if (kafka_storage.num_created_consumers == 0)
return {};

if (!context->getSettingsRef().stream_like_engine_allow_direct_select)
throw Exception(
ErrorCodes::QUERY_NOT_ALLOWED,
"Direct select is not allowed. To enable use setting `stream_like_engine_allow_direct_select`");

if (kafka_storage.mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");

ProfileEvents::increment(ProfileEvents::KafkaDirectReads);

/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(kafka_storage.num_created_consumers);
auto modified_context = Context::createCopy(context);
pipes.reserve(kafka_storage.num_consumers);
auto modified_context = Context::createCopy(getContext());
modified_context->applySettingsChanges(kafka_storage.settings_adjustments);

// Claim as many consumers as requested, but don't block
Expand All @@ -252,13 +231,10 @@ class ReadFromStorageKafkaStep final : public ISourceStep
LOG_DEBUG(kafka_storage.log, "Starting reading {} streams", pipes.size());
return Pipe::unitePipes(std::move(pipes));
}
ActionsDAGPtr buildFilterDAG();

const Names column_names;
StoragePtr storage;
StorageSnapshotPtr storage_snapshot;
std::shared_ptr<const StorageLimitsList> storage_limits;
ContextPtr context;
};

namespace
Expand Down Expand Up @@ -434,7 +410,7 @@ void StorageKafka::read(
size_t /* max_block_size */,
size_t /* num_streams */)
{
query_plan.addStep(std::make_unique<ReadFromStorageKafkaStep>(
query_plan.addStep(std::make_unique<ReadFromStorageKafka>(
column_names, shared_from_this(), storage_snapshot, query_info, std::move(query_context)));
}

Expand Down
4 changes: 2 additions & 2 deletions src/Storages/Kafka/StorageKafka.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ namespace DB
{

class StorageSystemKafkaConsumers;
class ReadFromStorageKafkaStep;
class ReadFromStorageKafka;

struct StorageKafkaInterceptors;

Expand Down Expand Up @@ -92,7 +92,7 @@ class StorageKafka final : public IStorage, WithContext
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }

private:
friend class ReadFromStorageKafkaStep;
friend class ReadFromStorageKafka;

// Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings;
Expand Down
9 changes: 9 additions & 0 deletions tests/integration/test_storage_kafka/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -5008,6 +5008,15 @@ def test_multiple_read_in_materialized_views(kafka_cluster, max_retries=15):
)
assert res == expected_result

kafka_delete_topic(admin_client, topic)
instance.query(
f"""
DROP TABLE test.kafka_multiple_read_input;
DROP TABLE test.kafka_multiple_read_table;
DROP TABLE test.kafka_multiple_read_mv;
"""
)


if __name__ == "__main__":
cluster.start()
Expand Down
Loading

0 comments on commit 93909fc

Please sign in to comment.