Skip to content

Commit

Permalink
(cloud-merge) Add file cache stats for queries to audit log
Browse files Browse the repository at this point in the history
  • Loading branch information
Lchangliang committed Apr 25, 2024
1 parent 3034ac3 commit 229468a
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 1 deletion.
10 changes: 10 additions & 0 deletions be/src/runtime/query_statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ void QueryStatistics::merge(const QueryStatistics& other) {
cpu_nanos += other.cpu_nanos.load(std::memory_order_relaxed);
shuffle_send_bytes += other.shuffle_send_bytes.load(std::memory_order_relaxed);
shuffle_send_rows += other.shuffle_send_rows.load(std::memory_order_relaxed);
_scan_bytes_from_local_storage +=
other._scan_bytes_from_local_storage.load(std::memory_order_relaxed);
_scan_bytes_from_remote_storage +=
other._scan_bytes_from_remote_storage.load(std::memory_order_relaxed);

int64_t other_peak_mem = other.max_peak_memory_bytes.load(std::memory_order_relaxed);
if (other_peak_mem > this->max_peak_memory_bytes) {
Expand All @@ -51,6 +55,8 @@ void QueryStatistics::to_pb(PQueryStatistics* statistics) {
statistics->set_cpu_ms(cpu_nanos / NANOS_PER_MILLIS);
statistics->set_returned_rows(returned_rows);
statistics->set_max_peak_memory_bytes(max_peak_memory_bytes);
statistics->set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
Expand All @@ -64,12 +70,16 @@ void QueryStatistics::to_thrift(TQueryStatistics* statistics) const {
current_used_memory_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_bytes(shuffle_send_bytes.load(std::memory_order_relaxed));
statistics->__set_shuffle_send_rows(shuffle_send_rows.load(std::memory_order_relaxed));
statistics->__set_scan_bytes_from_remote_storage(_scan_bytes_from_remote_storage);
statistics->__set_scan_bytes_from_local_storage(_scan_bytes_from_local_storage);
}

void QueryStatistics::from_pb(const PQueryStatistics& statistics) {
scan_rows = statistics.scan_rows();
scan_bytes = statistics.scan_bytes();
cpu_nanos = statistics.cpu_ms() * NANOS_PER_MILLIS;
_scan_bytes_from_local_storage = statistics.scan_bytes_from_local_storage();
_scan_bytes_from_remote_storage = statistics.scan_bytes_from_remote_storage();
}

void QueryStatistics::merge(QueryStatisticsRecvr* recvr) {
Expand Down
12 changes: 12 additions & 0 deletions be/src/runtime/query_statistics.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,14 @@ class QueryStatistics {
this->shuffle_send_rows.fetch_add(delta_rows, std::memory_order_relaxed);
}

void add_scan_bytes_from_local_storage(int64_t scan_bytes_from_local_storage) {
this->_scan_bytes_from_local_storage += scan_bytes_from_local_storage;
}

void add_scan_bytes_from_remote_storage(int64_t scan_bytes_from_remote_storage) {
this->_scan_bytes_from_remote_storage += scan_bytes_from_remote_storage;
}

void set_returned_rows(int64_t num_rows) { this->returned_rows = num_rows; }

void set_max_peak_memory_bytes(int64_t max_peak_memory_bytes) {
Expand All @@ -95,6 +103,8 @@ class QueryStatistics {
cpu_nanos.store(0, std::memory_order_relaxed);
shuffle_send_bytes.store(0, std::memory_order_relaxed);
shuffle_send_rows.store(0, std::memory_order_relaxed);
_scan_bytes_from_local_storage.store(0);
_scan_bytes_from_remote_storage.store(0);

returned_rows = 0;
max_peak_memory_bytes.store(0, std::memory_order_relaxed);
Expand All @@ -120,6 +130,8 @@ class QueryStatistics {
std::atomic<int64_t> scan_rows;
std::atomic<int64_t> scan_bytes;
std::atomic<int64_t> cpu_nanos;
std::atomic<int64_t> _scan_bytes_from_local_storage;
std::atomic<int64_t> _scan_bytes_from_remote_storage;
// number rows returned by query.
// only set once by result sink when closing.
int64_t returned_rows;
Expand Down
6 changes: 6 additions & 0 deletions be/src/vec/exec/scan/new_olap_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -688,6 +688,12 @@ void NewOlapScanner::_collect_profile_before_close() {
tablet->query_scan_bytes->increment(_compressed_bytes_read);
tablet->query_scan_rows->increment(_raw_rows_read);
tablet->query_scan_count->increment(1);
if (_query_statistics) {
_query_statistics->add_scan_bytes_from_local_storage(
stats.file_cache_stats.bytes_read_from_local);
_query_statistics->add_scan_bytes_from_remote_storage(
stats.file_cache_stats.bytes_read_from_remote);
}
}

} // namespace doris::vectorized
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public enum EventType {
// note: newly added fields should be always before fuzzyVariables
@AuditField(value = "FuzzyVariables")
public String fuzzyVariables = "";
@AuditField(value = "scanBytesFromLocalStorage")
public long scanBytesFromLocalStorage = -1;
@AuditField(value = "scanBytesFromRemoteStorage")
public long scanBytesFromRemoteStorage = -1;

public long pushToAuditLogQueueTime;

Expand Down Expand Up @@ -251,6 +255,16 @@ public AuditEventBuilder setWorkloadGroup(String workloadGroup) {
return this;
}

public AuditEventBuilder setScanBytesFromLocalStorage(long scanBytesFromLocalStorage) {
auditEvent.scanBytesFromLocalStorage = scanBytesFromLocalStorage;
return this;
}

public AuditEventBuilder setScanBytesFromRemoteStorage(long scanBytesFromRemoteStorage) {
auditEvent.scanBytesFromRemoteStorage = scanBytesFromRemoteStorage;
return this;
}

public AuditEvent build() {
return this.auditEvent;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@ private static void logAuditLogImpl(ConnectContext ctx, String origStmt, Stateme
auditEventBuilder.setSqlDigest(sqlDigest);
}
}
auditEventBuilder.setIsQuery(true);
auditEventBuilder.setIsQuery(true)
.setScanBytesFromLocalStorage(statistics == null ? 0 : statistics.getScanBytesFromLocalStorage())
.setScanBytesFromRemoteStorage(statistics == null ? 0 : statistics.getScanBytesFromRemoteStorage());
} else {
auditEventBuilder.setIsQuery(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ protected void runOneCycle() {
if (queryStats != null) {
auditEvent.scanRows = queryStats.scan_rows;
auditEvent.scanBytes = queryStats.scan_bytes;
auditEvent.scanBytesFromLocalStorage = queryStats.scan_bytes_from_local_storage;
auditEvent.scanBytesFromRemoteStorage = queryStats.scan_bytes_from_remote_storage;
auditEvent.peakMemoryBytes = queryStats.max_peak_memory_bytes;
auditEvent.cpuTimeMs = queryStats.cpu_ms;
auditEvent.shuffleSendBytes = queryStats.shuffle_send_bytes;
Expand Down Expand Up @@ -176,6 +178,8 @@ public Map<Long, Map<String, TQueryStatistics>> getBeQueryStatsMap() {
private void mergeQueryStatistics(TQueryStatistics dst, TQueryStatistics src) {
dst.scan_rows += src.scan_rows;
dst.scan_bytes += src.scan_bytes;
dst.scan_bytes_from_local_storage += src.scan_bytes_from_local_storage;
dst.scan_bytes_from_remote_storage += src.scan_bytes_from_remote_storage;
dst.cpu_ms += src.cpu_ms;
dst.shuffle_send_bytes += src.shuffle_send_bytes;
dst.shuffle_send_rows += src.shuffle_send_rows;
Expand Down
2 changes: 2 additions & 0 deletions gensrc/proto/data.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ message PQueryStatistics {
optional int64 cpu_ms = 4;
optional int64 max_peak_memory_bytes = 5;
repeated PNodeStatistics nodes_statistics = 6;
optional int64 scan_bytes_from_local_storage = 7;
optional int64 scan_bytes_from_remote_storage = 8;
}

message PRowBatch {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/FrontendService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ struct TQueryStatistics {
7: optional i64 workload_group_id
8: optional i64 shuffle_send_bytes
9: optional i64 shuffle_send_rows
10: optional i64 scan_bytes_from_local_storage
11: optional i64 scan_bytes_from_remote_storage
}

struct TReportWorkloadRuntimeStatusParams {
Expand Down

0 comments on commit 229468a

Please sign in to comment.