Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
kssenii committed Oct 18, 2023
1 parent eb4519b commit c4bad25
Showing 1 changed file with 10 additions and 2 deletions.
12 changes: 10 additions & 2 deletions src/Storages/S3Queue/StorageS3Queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,16 +354,24 @@ bool StorageS3Queue::streamToViews()
auto file_iterator = createFileIterator(s3queue_context, nullptr);

Pipes pipes;
pipes.reserve(s3queue_settings->s3queue_processing_threads_num);
for (size_t i = 0; i < s3queue_settings->s3queue_processing_threads_num; ++i)
{
auto source = createSource(file_iterator, block_io.pipeline.getHeader().getNames(), storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);
auto source = createSource(
file_iterator, block_io.pipeline.getHeader().getNames(),
storage_snapshot, DBMS_DEFAULT_BUFFER_SIZE, s3queue_context);

pipes.emplace_back(std::move(source));
}
auto pipe = Pipe::unitePipes(std::move(pipes));

std::atomic_size_t rows = 0;
block_io.pipeline.complete(std::move(pipe));
block_io.pipeline.setNumThreads(s3queue_settings->s3queue_processing_threads_num);
block_io.pipeline.setConcurrencyControl(s3queue_context->getSettingsRef().use_concurrency_control);

std::atomic_size_t rows = 0;
block_io.pipeline.setProgressCallback([&](const Progress & progress) { rows += progress.read_rows.load(); });

CompletedPipelineExecutor executor(block_io.pipeline);
executor.execute();

Expand Down

0 comments on commit c4bad25

Please sign in to comment.