Skip to content
39 changes: 25 additions & 14 deletions be/src/exec/schema_scanner/schema_processlist_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,21 @@ namespace doris {
#include "common/compile_check_begin.h"

std::vector<SchemaScanner::ColumnDesc> SchemaProcessListScanner::_s_processlist_columns = {
{"CURRENT_CONNECTED", TYPE_VARCHAR, sizeof(StringRef), false},
{"ID", TYPE_LARGEINT, sizeof(int128_t), false},
{"USER", TYPE_VARCHAR, sizeof(StringRef), false},
{"HOST", TYPE_VARCHAR, sizeof(StringRef), false},
{"LOGIN_TIME", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false},
{"CATALOG", TYPE_VARCHAR, sizeof(StringRef), false},
{"DB", TYPE_VARCHAR, sizeof(StringRef), false},
{"COMMAND", TYPE_VARCHAR, sizeof(StringRef), false},
{"TIME", TYPE_INT, sizeof(int32_t), false},
{"STATE", TYPE_VARCHAR, sizeof(StringRef), false},
{"QUERY_ID", TYPE_VARCHAR, sizeof(StringRef), false},
{"INFO", TYPE_VARCHAR, sizeof(StringRef), false},
{"FE", TYPE_VARCHAR, sizeof(StringRef), false},
{"CLOUD_CLUSTER", TYPE_VARCHAR, sizeof(StringRef), false}};
{"CurrentConnected", TYPE_VARCHAR, sizeof(StringRef), false}, // 0
{"Id", TYPE_LARGEINT, sizeof(int128_t), false}, // 1
{"User", TYPE_VARCHAR, sizeof(StringRef), false}, // 2
{"Host", TYPE_VARCHAR, sizeof(StringRef), false}, // 3
{"LoginTime", TYPE_DATETIMEV2, sizeof(DateTimeV2ValueType), false}, // 4
{"Catalog", TYPE_VARCHAR, sizeof(StringRef), false}, // 5
{"Db", TYPE_VARCHAR, sizeof(StringRef), false}, // 6
{"Command", TYPE_VARCHAR, sizeof(StringRef), false}, // 7
{"Time", TYPE_INT, sizeof(int32_t), false}, // 8
{"State", TYPE_VARCHAR, sizeof(StringRef), false}, // 9
{"QueryId", TYPE_VARCHAR, sizeof(StringRef), false}, // 10
{"TraceId", TYPE_VARCHAR, sizeof(StringRef), false}, // 11
{"Info", TYPE_VARCHAR, sizeof(StringRef), false}, // 12
{"FE", TYPE_VARCHAR, sizeof(StringRef), false}, // 13
{"CloudCluster", TYPE_VARCHAR, sizeof(StringRef), false}}; // 14

SchemaProcessListScanner::SchemaProcessListScanner()
: SchemaScanner(_s_processlist_columns, TSchemaTableType::SCH_PROCESSLIST) {}
Expand All @@ -62,6 +63,16 @@ Status SchemaProcessListScanner::start(RuntimeState* state) {
TShowProcessListResult tmp_ret;
RETURN_IF_ERROR(
SchemaHelper::show_process_list(fe_addr.hostname, fe_addr.port, request, &tmp_ret));

// Check and adjust the number of columns in each row to ensure 15 columns
// This is compatible with newly added column "trace id". #51400
for (auto& row : tmp_ret.process_list) {
if (row.size() == 14) {
// Insert an empty string at position 11 (index 11) for the TRACE_ID column
row.insert(row.begin() + 11, "");
}
}

_process_list_result.process_list.insert(_process_list_result.process_list.end(),
tmp_ret.process_list.begin(),
tmp_ret.process_list.end());
Expand Down
9 changes: 9 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1382,4 +1382,13 @@ Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
return Status::OK();
}

Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
if (query_stats == nullptr) {
return Status::InvalidArgument("query_stats is nullptr");
}

return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
print_id(query_id), query_stats);
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@ class FragmentMgr : public RestMonitorIface {

Status get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status);
// get the query statistics of with a given query id
Status get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats);

std::shared_ptr<QueryContext> get_query_ctx(const TUniqueId& query_id);

Expand Down
13 changes: 13 additions & 0 deletions be/src/runtime/runtime_query_statistics_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,19 @@ void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* blo
}
}

Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id,
TQueryStatistics* query_stats) {
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);

auto resource_ctx = _resource_contexts_map.find(query_id);
if (resource_ctx == _resource_contexts_map.end()) {
return Status::InternalError("failed to find query with id {}", query_id);
}

resource_ctx->second->to_thrift_query_statistics(query_stats);
return Status::OK();
}

void RuntimeQueryStatisticsMgr::get_tasks_resource_context(
std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) {
std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock);
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/runtime_query_statistics_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class RuntimeQueryStatisticsMgr {

// used for backend_active_tasks
void get_active_be_tasks_block(vectorized::Block* block);
Status get_query_statistics(const std::string& query_id, TQueryStatistics* query_stats);

// used for MemoryReclamation
void get_tasks_resource_context(std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs);
Expand Down Expand Up @@ -98,4 +99,4 @@ class RuntimeQueryStatisticsMgr {
_load_channel_profile_map;
};

} // namespace doris
} // namespace doris
25 changes: 17 additions & 8 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1321,19 +1321,28 @@ void BaseBackendService::get_realtime_exec_status(TGetRealtimeExecStatusResponse

std::unique_ptr<TReportExecStatusParams> report_exec_status_params =
std::make_unique<TReportExecStatusParams>();
Status st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
request.id, report_exec_status_params.get());
std::unique_ptr<TQueryStatistics> query_stats = std::make_unique<TQueryStatistics>();

if (!st.ok()) {
response.__set_status(st.to_thrift());
return;
std::string req_type = request.__isset.req_type ? request.req_type : "profile";
Status st;
if (req_type == "stats") {
st = ExecEnv::GetInstance()->fragment_mgr()->get_query_statistics(request.id,
query_stats.get());
if (st.ok()) {
response.__set_query_stats(*query_stats);
}
} else {
// default is "profile"
st = ExecEnv::GetInstance()->fragment_mgr()->get_realtime_exec_status(
request.id, report_exec_status_params.get());
if (st.ok()) {
response.__set_report_exec_status_params(*report_exec_status_params);
}
}

report_exec_status_params->__set_query_id(TUniqueId());
report_exec_status_params->__set_done(false);

response.__set_status(Status::OK().to_thrift());
response.__set_report_exec_status_params(*report_exec_status_params);
response.__set_status(st.to_thrift());
}

void BaseBackendService::get_dictionary_status(TDictionaryStatusList& result,
Expand Down
32 changes: 17 additions & 15 deletions fe/fe-core/src/main/java/org/apache/doris/catalog/SchemaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,22 +524,24 @@ public class SchemaTable extends Table {
.column("IS_MUTABLE", ScalarType.createType(PrimitiveType.BOOLEAN))
.build()))
.put("processlist",
// ATTN, the column name should be compatible with MySQL
// See: https://dev.mysql.com/doc/refman/8.4/en/show-processlist.html
new SchemaTable(SystemIdGenerator.getNextId(), "processlist", TableType.SCHEMA,
builder().column("CURRENT_CONNECTED", ScalarType.createVarchar(16))
.column("ID", ScalarType.createType(PrimitiveType.LARGEINT))
.column("USER", ScalarType.createVarchar(32))
.column("HOST", ScalarType.createVarchar(261))
.column("LOGIN_TIME", ScalarType.createType(PrimitiveType.DATETIMEV2))
.column("CATALOG", ScalarType.createVarchar(64))
.column("DB", ScalarType.createVarchar(64))
.column("COMMAND", ScalarType.createVarchar(16))
.column("TIME", ScalarType.createType(PrimitiveType.INT))
.column("STATE", ScalarType.createVarchar(64))
.column("QUERY_ID", ScalarType.createVarchar(256))
.column("INFO", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
.column("FE",
ScalarType.createVarchar(64))
.column("CLOUD_CLUSTER", ScalarType.createVarchar(64)).build(), true))
builder().column("CurrentConnected", ScalarType.createVarchar(16))
.column("Id", ScalarType.createType(PrimitiveType.LARGEINT))
.column("User", ScalarType.createVarchar(32))
.column("Host", ScalarType.createVarchar(261))
.column("LoginTime", ScalarType.createType(PrimitiveType.DATETIMEV2))
.column("Catalog", ScalarType.createVarchar(64))
.column("Db", ScalarType.createVarchar(64))
.column("Command", ScalarType.createVarchar(16))
.column("Time", ScalarType.createType(PrimitiveType.INT))
.column("State", ScalarType.createVarchar(64))
.column("QueryId", ScalarType.createVarchar(256))
.column("TraceId", ScalarType.createVarchar(256))
.column("Info", ScalarType.createVarchar(ScalarType.MAX_VARCHAR_LENGTH))
.column("FE", ScalarType.createVarchar(64))
.column("CloudCluster", ScalarType.createVarchar(64)).build(), true))
.put("workload_policy",
new SchemaTable(SystemIdGenerator.getNextId(), "workload_policy", TableType.SCHEMA,
builder().column("ID", ScalarType.createType(PrimitiveType.BIGINT))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@
import org.apache.doris.thrift.TGetRealtimeExecStatusRequest;
import org.apache.doris.thrift.TGetRealtimeExecStatusResponse;
import org.apache.doris.thrift.TNetworkAddress;
import org.apache.doris.thrift.TQueryStatistics;
import org.apache.doris.thrift.TStatusCode;
import org.apache.doris.thrift.TUniqueId;

import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -253,7 +255,7 @@ public List<List<String>> getQueryInfoByColumnNameList(List<String> columnNameLi
}

private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
TUniqueId queryID, TNetworkAddress targetBackend) {
TUniqueId queryID, String reqType, TNetworkAddress targetBackend) {
TGetRealtimeExecStatusResponse resp = null;
BackendService.Client client = null;

Expand All @@ -268,6 +270,7 @@ private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
try {
TGetRealtimeExecStatusRequest req = new TGetRealtimeExecStatusRequest();
req.setId(queryID);
req.setReqType(reqType);
resp = client.getRealtimeExecStatus(req);
} catch (TException e) {
LOG.warn("Got exception when getRealtimeExecStatus, query {} backend {}",
Expand All @@ -293,16 +296,16 @@ private static TGetRealtimeExecStatusResponse getRealtimeQueryProfile(
return null;
}

if (!resp.isSetReportExecStatusParams()) {
LOG.warn("Invalid GetRealtimeExecStatusResponse, query {}",
if (!resp.isSetReportExecStatusParams() && !resp.isSetQueryStats()) {
LOG.warn("Invalid GetRealtimeExecStatusResponse, missing both exec status and query stats. query {}",
DebugUtil.printId(queryID));
return null;
}

return resp;
}

private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id) {
private List<Future<TGetRealtimeExecStatusResponse>> createFetchRealTimeProfileTasks(String id, String reqType) {
// For query, id is queryId, for load, id is LoadLoadingTaskId
class QueryIdAndAddress {
public TUniqueId id;
Expand Down Expand Up @@ -365,18 +368,66 @@ class QueryIdAndAddress {
}

for (QueryIdAndAddress idAndAddress : involvedBackends) {
Callable<TGetRealtimeExecStatusResponse> task = () -> {
return getRealtimeQueryProfile(idAndAddress.id, idAndAddress.beAddress);
};
Callable<TGetRealtimeExecStatusResponse> task = () -> getRealtimeQueryProfile(idAndAddress.id,
reqType, idAndAddress.beAddress);
Future<TGetRealtimeExecStatusResponse> future = fetchRealTimeProfileExecutor.submit(task);
futures.add(future);
}

return futures;
}

public TQueryStatistics getQueryStatistic(String queryId) throws Exception {
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(queryId,
"stats");
List<TQueryStatistics> queryStatisticsList = Lists.newArrayList();
for (Future<TGetRealtimeExecStatusResponse> future : futures) {
try {
TGetRealtimeExecStatusResponse resp = future.get(5, TimeUnit.SECONDS);
if (resp != null && resp.getStatus().status_code == TStatusCode.OK && resp.isSetQueryStats()) {
queryStatisticsList.add(resp.getQueryStats());
} else {
LOG.warn("Failed to get real-time query stats, id {}, resp is {}",
queryId, resp == null ? "null" : resp.toString());
throw new Exception("Failed to get realtime query stats: " + resp.toString());
}
} catch (Exception e) {
LOG.warn("Failed to get real-time query stats, id {}, error: {}", queryId, e.getMessage(), e);
throw new Exception("Failed to get realtime query stats: " + e.getMessage());
}
}
Preconditions.checkState(!queryStatisticsList.isEmpty() && queryStatisticsList.size() == futures.size(),
String.format("Failed to get real-time stats, id %s, "
+ "queryStatisticsList size %d != futures size %d",
queryId, queryStatisticsList.size(), futures.size()));

TQueryStatistics summary = new TQueryStatistics();
for (TQueryStatistics queryStats : queryStatisticsList) {
// sum all the statistics
summary.setScanRows(summary.getScanRows() + queryStats.getScanRows());
summary.setScanBytes(summary.getScanBytes() + queryStats.getScanBytes());
summary.setReturnedRows(summary.getReturnedRows() + queryStats.getReturnedRows());
summary.setCpuMs(summary.getCpuMs() + queryStats.getCpuMs());
summary.setMaxPeakMemoryBytes(Math.max(summary.getMaxPeakMemoryBytes(),
queryStats.getMaxPeakMemoryBytes()));
summary.setCurrentUsedMemoryBytes(Math.max(summary.getCurrentUsedMemoryBytes(),
queryStats.getCurrentUsedMemoryBytes()));
summary.setShuffleSendBytes(summary.getShuffleSendBytes() + queryStats.getShuffleSendBytes());
summary.setShuffleSendRows(summary.getShuffleSendRows() + queryStats.getShuffleSendRows());
summary.setScanBytesFromLocalStorage(
summary.getScanBytesFromLocalStorage() + queryStats.getScanBytesFromLocalStorage());
summary.setScanBytesFromRemoteStorage(
summary.getScanBytesFromRemoteStorage() + queryStats.getScanBytesFromRemoteStorage());
summary.setSpillWriteBytesToLocalStorage(
summary.getSpillWriteBytesToLocalStorage() + queryStats.getSpillWriteBytesToLocalStorage());
summary.setSpillReadBytesFromLocalStorage(
summary.getSpillReadBytesFromLocalStorage() + queryStats.getSpillReadBytesFromLocalStorage());
}
return summary;
}

public String getProfile(String id) {
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id);
List<Future<TGetRealtimeExecStatusResponse>> futures = createFetchRealTimeProfileTasks(id, "profile");
// beAddr of reportExecStatus of QeProcessorImpl is meaningless, so assign a dummy address
// to avoid compile failing.
TNetworkAddress dummyAddr = new TNetworkAddress();
Expand Down Expand Up @@ -1057,3 +1108,4 @@ public void removeProfile(String profileId) {
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.doris.httpv2.controller;

import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.SchemaTable;
import org.apache.doris.catalog.Table;
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.httpv2.entity.ResponseEntityBuilder;
import org.apache.doris.httpv2.rest.RestBaseController;
Expand Down Expand Up @@ -56,20 +58,8 @@ public class SessionController extends RestBaseController {
private static final Logger LOG = LogManager.getLogger(SessionController.class);

static {
SESSION_TABLE_HEADER.add("CurrentConnected");
SESSION_TABLE_HEADER.add("Id");
SESSION_TABLE_HEADER.add("User");
SESSION_TABLE_HEADER.add("Host");
SESSION_TABLE_HEADER.add("LoginTime");
SESSION_TABLE_HEADER.add("Catalog");
SESSION_TABLE_HEADER.add("Db");
SESSION_TABLE_HEADER.add("Command");
SESSION_TABLE_HEADER.add("Time");
SESSION_TABLE_HEADER.add("State");
SESSION_TABLE_HEADER.add("QueryId");
SESSION_TABLE_HEADER.add("Info");
SESSION_TABLE_HEADER.add("FE");
SESSION_TABLE_HEADER.add("CloudCluster");
Table tbl = SchemaTable.TABLE_MAP.get("processlist");
tbl.getBaseSchema().stream().forEach(column -> SESSION_TABLE_HEADER.add(column.getName()));
}

@RequestMapping(path = "/session/all", method = RequestMethod.GET)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.httpv2.entity.ResponseBody;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.system.Frontend;
import org.apache.doris.system.SystemInfoService.HostInfo;

import com.google.common.base.Strings;
import com.google.gson.reflect.TypeToken;
Expand Down Expand Up @@ -57,6 +58,11 @@ static List<Pair<String, Integer>> getFeList() {
.collect(Collectors.toList());
}

static boolean isCurrentFe(String ip, int port) {
HostInfo hostInfo = Env.getCurrentEnv().getSelfNode();
return hostInfo.isSame(new HostInfo(ip, port));
}

static String concatUrl(Pair<String, Integer> ipPort, String path, Map<String, String> arguments) {
StringBuilder url = new StringBuilder("http://")
.append(ipPort.first).append(":").append(ipPort.second).append(path);
Expand Down
Loading
Loading