Skip to content

Commit

Permalink
Do not acquire lock for total_bytes/total_rows for Buffer engine
Browse files Browse the repository at this point in the history
When Buffer() is under preassure, acquiring per-layer lock may take
significant time. And so the following query may take significant amount of time:

    SELECT total_bytes, total_rows FROM system.tables WHERE engine='Buffer'
  • Loading branch information
azat committed May 12, 2021
1 parent 26a1277 commit 074b57f
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 23 deletions.
43 changes: 25 additions & 18 deletions src/Storages/StorageBuffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -539,8 +539,8 @@ class BufferBlockOutputStream : public IBlockOutputStream

size_t bytes = block.bytes();

storage.writes.rows += rows;
storage.writes.bytes += bytes;
storage.lifetime_writes.rows += rows;
storage.lifetime_writes.bytes += bytes;

/// If the block already exceeds the maximum limit, then we skip the buffer.
if (rows > storage.max_thresholds.rows || bytes > storage.max_thresholds.bytes)
Expand Down Expand Up @@ -606,6 +606,9 @@ class BufferBlockOutputStream : public IBlockOutputStream
if (!buffer.data)
{
buffer.data = sorted_block.cloneEmpty();

storage.total_writes.rows += buffer.data.rows();
storage.total_writes.bytes += buffer.data.allocatedBytes();
}
else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
Expand All @@ -620,7 +623,13 @@ class BufferBlockOutputStream : public IBlockOutputStream
if (!buffer.first_write_time)
buffer.first_write_time = current_time;

size_t old_rows = buffer.data.rows();
size_t old_bytes = buffer.data.allocatedBytes();

appendBlock(sorted_block, buffer.data);

storage.total_writes.rows += (buffer.data.rows() - old_rows);
storage.total_writes.bytes += (buffer.data.allocatedBytes() - old_bytes);
}
};

Expand Down Expand Up @@ -823,13 +832,20 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
buffer.data.swap(block_to_write);
buffer.first_write_time = 0;

CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_to_write.rows());
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_to_write.bytes());
size_t block_rows = block_to_write.rows();
size_t block_bytes = block_to_write.bytes();
size_t block_allocated_bytes = block_to_write.allocatedBytes();

CurrentMetrics::sub(CurrentMetrics::StorageBufferRows, block_rows);
CurrentMetrics::sub(CurrentMetrics::StorageBufferBytes, block_bytes);

ProfileEvents::increment(ProfileEvents::StorageBufferFlush);

if (!destination_id)
{
total_writes.rows -= block_rows;
total_writes.bytes -= block_allocated_bytes;

LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return;
}
Expand Down Expand Up @@ -866,6 +882,9 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
throw;
}

total_writes.rows -= block_rows;
total_writes.bytes -= block_allocated_bytes;

UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
}
Expand Down Expand Up @@ -1022,24 +1041,12 @@ std::optional<UInt64> StorageBuffer::totalRows(const Settings & settings) const
if (!underlying_rows)
return underlying_rows;

UInt64 rows = 0;
for (const auto & buffer : buffers)
{
const auto lock(buffer.lockForReading());
rows += buffer.data.rows();
}
return rows + *underlying_rows;
return total_writes.rows + *underlying_rows;
}

std::optional<UInt64> StorageBuffer::totalBytes(const Settings & /*settings*/) const
{
UInt64 bytes = 0;
for (const auto & buffer : buffers)
{
const auto lock(buffer.lockForReading());
bytes += buffer.data.allocatedBytes();
}
return bytes;
return total_writes.bytes;
}

void StorageBuffer::alter(const AlterCommands & params, ContextPtr local_context, TableLockHolder &)
Expand Down
11 changes: 6 additions & 5 deletions src/Storages/StorageBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ friend class BufferBlockOutputStream;
std::optional<UInt64> totalRows(const Settings & settings) const override;
std::optional<UInt64> totalBytes(const Settings & settings) const override;

std::optional<UInt64> lifetimeRows() const override { return writes.rows; }
std::optional<UInt64> lifetimeBytes() const override { return writes.bytes; }
std::optional<UInt64> lifetimeRows() const override { return lifetime_writes.rows; }
std::optional<UInt64> lifetimeBytes() const override { return lifetime_writes.bytes; }


private:
Expand Down Expand Up @@ -143,12 +143,13 @@ friend class BufferBlockOutputStream;
StorageID destination_id;
bool allow_materialized;

/// Lifetime
struct LifeTimeWrites
struct Writes
{
std::atomic<size_t> rows = 0;
std::atomic<size_t> bytes = 0;
} writes;
};
Writes lifetime_writes;
Writes total_writes;

Poco::Logger * log;

Expand Down

0 comments on commit 074b57f

Please sign in to comment.