diff --git a/be/src/exec/schema_scanner.cpp b/be/src/exec/schema_scanner.cpp index 32197e37e1e21b..908bb7b6f09f03 100644 --- a/be/src/exec/schema_scanner.cpp +++ b/be/src/exec/schema_scanner.cpp @@ -37,6 +37,7 @@ #include "exec/schema_scanner/schema_partitions_scanner.h" #include "exec/schema_scanner/schema_processlist_scanner.h" #include "exec/schema_scanner/schema_profiling_scanner.h" +#include "exec/schema_scanner/schema_routine_scanner.h" #include "exec/schema_scanner/schema_rowsets_scanner.h" #include "exec/schema_scanner/schema_schema_privileges_scanner.h" #include "exec/schema_scanner/schema_schemata_scanner.h" @@ -161,6 +162,8 @@ std::unique_ptr SchemaScanner::create(TSchemaTableType::type type return SchemaWorkloadGroupsScanner::create_unique(); case TSchemaTableType::SCH_PROCESSLIST: return SchemaProcessListScanner::create_unique(); + case TSchemaTableType::SCH_PROCEDURES: + return SchemaRoutinesScanner::create_unique(); default: return SchemaDummyScanner::create_unique(); break; diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.cpp b/be/src/exec/schema_scanner/schema_routine_scanner.cpp new file mode 100644 index 00000000000000..7db46ada650bc6 --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_scanner.cpp @@ -0,0 +1,172 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/schema_scanner/schema_routine_scanner.h" + +#include "runtime/client_cache.h" +#include "runtime/exec_env.h" +#include "runtime/runtime_state.h" +#include "util/thrift_rpc_helper.h" +#include "vec/common/string_ref.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_factory.hpp" + +namespace doris { +std::vector SchemaRoutinesScanner::_s_tbls_columns = { + {"SPECIFIC_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_CATALOG", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_SCHEMA", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DTD_IDENTIFIER", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_BODY", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_DEFINITION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"EXTERNAL_NAME", TYPE_VARCHAR, sizeof(StringRef), true}, + {"EXTERNAL_LANGUAGE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"PARAMETER_STYLE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"IS_DETERMINISTIC", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SQL_DATA_ACCESS", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SQL_PATH", TYPE_VARCHAR, sizeof(StringRef), true}, + {"SECURITY_TYPE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CREATED", TYPE_DATETIME, sizeof(int64_t), true}, + {"LAST_ALTERED", TYPE_DATETIME, sizeof(int64_t), true}, + {"SQL_MODE", TYPE_VARCHAR, sizeof(StringRef), true}, + {"ROUTINE_COMMENT", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DEFINER", TYPE_VARCHAR, sizeof(StringRef), true}, + {"CHARACTER_SET_CLIENT", TYPE_VARCHAR, sizeof(StringRef), true}, + {"COLLATION_CONNECTION", TYPE_VARCHAR, sizeof(StringRef), true}, + {"DATABASE_COLLATION", TYPE_VARCHAR, sizeof(StringRef), true}, +}; + +SchemaRoutinesScanner::SchemaRoutinesScanner() + : SchemaScanner(_s_tbls_columns, TSchemaTableType::SCH_PROCEDURES) {} + +Status SchemaRoutinesScanner::start(RuntimeState* state) { + _block_rows_limit = state->batch_size(); + _rpc_timeout = state->execution_timeout() * 1000; + return Status::OK(); +} + +Status SchemaRoutinesScanner::get_block_from_fe() { + TNetworkAddress master_addr = ExecEnv::GetInstance()->master_info()->network_address; + TSchemaTableRequestParams schema_table_request_params; + for (int i = 0; i < _s_tbls_columns.size(); i++) { + schema_table_request_params.__isset.columns_name = true; + schema_table_request_params.columns_name.emplace_back(_s_tbls_columns[i].name); + } + schema_table_request_params.__set_current_user_ident(*_param->common_param->current_user_ident); + TFetchSchemaTableDataRequest request; + request.__set_schema_table_name(TSchemaTableName::ROUTINES_INFO); + request.__set_schema_table_params(schema_table_request_params); + TFetchSchemaTableDataResult result; + RETURN_IF_ERROR(ThriftRpcHelper::rpc( + master_addr.hostname, master_addr.port, + [&request, &result](FrontendServiceConnection& client) { + client->fetchSchemaTableData(result, request); + }, + _rpc_timeout)); + Status status(Status::create(result.status)); + if (!status.ok()) { + LOG(WARNING) << "fetch routines from FE failed, errmsg=" << status; + return status; + } + std::vector result_data = result.data_batch; + _routines_block = vectorized::Block::create_unique(); + for (int i = 0; i < _s_tbls_columns.size(); ++i) { + TypeDescriptor descriptor(_s_tbls_columns[i].type); + auto data_type = vectorized::DataTypeFactory::instance().create_data_type(descriptor, true); + _routines_block->insert(vectorized::ColumnWithTypeAndName( + data_type->create_column(), data_type, _s_tbls_columns[i].name)); + } + _routines_block->reserve(_block_rows_limit); + if (result_data.size() > 0) { + int col_size = result_data[0].column_value.size(); + if (col_size != _s_tbls_columns.size()) { + return Status::InternalError("routine table schema is not match for FE and BE"); + } + } + auto insert_string_value = [&](int col_index, std::string str_val, vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + reinterpret_cast(col_ptr)->insert_data(str_val.data(), + str_val.size()); + nullable_column->get_null_map_data().emplace_back(0); + }; + auto insert_datetime_value = [&](int col_index, const std::vector& datas, + vectorized::Block* block) { + vectorized::MutableColumnPtr mutable_col_ptr; + mutable_col_ptr = std::move(*block->get_by_position(col_index).column).assume_mutable(); + auto* nullable_column = + reinterpret_cast(mutable_col_ptr.get()); + vectorized::IColumn* col_ptr = &nullable_column->get_nested_column(); + auto data = datas[0]; + reinterpret_cast*>(col_ptr)->insert_data( + reinterpret_cast(data), 0); + nullable_column->get_null_map_data().emplace_back(0); + }; + + for (int i = 0; i < result_data.size(); i++) { + TRow row = result_data[i]; + + for (int j = 0; j < _s_tbls_columns.size(); j++) { + if (_s_tbls_columns[j].type == TYPE_DATETIME) { + std::vector datas(1); + VecDateTimeValue src[1]; + src[0].from_date_str(row.column_value[j].stringVal.data(), + row.column_value[j].stringVal.size()); + datas[0] = src; + insert_datetime_value(j, datas, _routines_block.get()); + } else { + insert_string_value(j, row.column_value[j].stringVal, _routines_block.get()); + } + } + } + return Status::OK(); +} + +Status SchemaRoutinesScanner::get_next_block(vectorized::Block* block, bool* eos) { + if (!_is_init) { + return Status::InternalError("Used before initialized."); + } + + if (nullptr == block || nullptr == eos) { + return Status::InternalError("input pointer is nullptr."); + } + + if (_routines_block == nullptr) { + RETURN_IF_ERROR(get_block_from_fe()); + _total_rows = _routines_block->rows(); + } + + if (_row_idx == _total_rows) { + *eos = true; + return Status::OK(); + } + + int current_batch_rows = std::min(_block_rows_limit, _total_rows - _row_idx); + vectorized::MutableBlock mblock = vectorized::MutableBlock::build_mutable_block(block); + mblock.add_rows(_routines_block.get(), _row_idx, current_batch_rows); + _row_idx += current_batch_rows; + + *eos = _row_idx == _total_rows; + return Status::OK(); +} + +} // namespace doris \ No newline at end of file diff --git a/be/src/exec/schema_scanner/schema_routine_scanner.h b/be/src/exec/schema_scanner/schema_routine_scanner.h new file mode 100644 index 00000000000000..543f9e8e8f684a --- /dev/null +++ b/be/src/exec/schema_scanner/schema_routine_scanner.h @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include + +#include "common/status.h" +#include "exec/schema_scanner.h" + +namespace doris { +class RuntimeState; +namespace vectorized { +class Block; +} // namespace vectorized + +class SchemaRoutinesScanner : public SchemaScanner { + ENABLE_FACTORY_CREATOR(SchemaRoutinesScanner); + +public: + SchemaRoutinesScanner(); + ~SchemaRoutinesScanner() override = default; + + Status start(RuntimeState* state) override; + Status get_next_block(vectorized::Block* block, bool* eos) override; + + static std::vector _s_tbls_columns; + +private: + Status get_block_from_fe(); + + int _block_rows_limit = 4096; + int _row_idx = 0; + int _total_rows = 0; + std::unique_ptr _routines_block = nullptr; + int _rpc_timeout = 3000; +}; +}; // namespace doris diff --git a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java index 87cbd7b58a33f2..dc91d344f63421 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java +++ b/fe/fe-core/src/main/java/org/apache/doris/plsql/metastore/PlsqlManager.java @@ -22,6 +22,7 @@ import org.apache.doris.common.io.Writable; import org.apache.doris.persist.gson.GsonUtils; +import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; import com.google.gson.annotations.SerializedName; import org.apache.logging.log4j.LogManager; @@ -34,7 +35,20 @@ public class PlsqlManager implements Writable { private static final Logger LOG = LogManager.getLogger(PlsqlManager.class); - + public static final ImmutableList ROUTINE_INFO_TITLE_NAMES = new ImmutableList.Builder() + .add("SPECIFIC_NAME").add("ROUTINE_CATALOG").add("ROUTINE_SCHEMA").add("ROUTINE_NAME") + .add("ROUTINE_TYPE") + .add("DTD_IDENTIFIER").add("ROUTINE_BODY") + .add("ROUTINE_DEFINITION").add("EXTERNAL_NAME") + .add("EXTERNAL_LANGUAGE").add("PARAMETER_STYLE") + .add("IS_DETERMINISTIC") + .add("SQL_DATA_ACCESS").add("SQL_PATH") + .add("SECURITY_TYPE").add("CREATED") + .add("LAST_ALTERED").add("SQL_MODE") + .add("ROUTINE_COMMENT") + .add("DEFINER").add("CHARACTER_SET_CLIENT") + .add("COLLATION_CONNECTION").add("DATABASE_COLLATION") + .build(); @SerializedName(value = "nameToStoredProcedures") Map nameToStoredProcedures = Maps.newConcurrentMap(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index 992c512e4026b0..2954cadf6d4919 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -285,9 +285,9 @@ public class FrontendServiceImpl implements FrontendService.Iface { private MasterImpl masterImpl; private ExecuteEnv exeEnv; - // key is txn id,value is index of plan fragment instance, it's used by multi table request plan - private ConcurrentHashMap multiTableFragmentInstanceIdIndexMap = - new ConcurrentHashMap<>(64); + // key is txn id,value is index of plan fragment instance, it's used by multi + // table request plan + private ConcurrentHashMap multiTableFragmentInstanceIdIndexMap = new ConcurrentHashMap<>(64); private final Map proxyQueryIdToConnCtx = new ConcurrentHashMap<>(64); @@ -352,8 +352,9 @@ public TConfirmUnusedRemoteFilesResult confirmUnusedRemoteFiles(TConfirmUnusedRe } // check cooldownMetaId of all replicas are the same List replicas = Env.getCurrentEnv().getTabletInvertedIndex().getReplicas(info.tablet_id); - // FIXME(plat1ko): We only delete remote files when tablet is under a stable state: enough replicas and - // all replicas are alive. Are these conditions really sufficient or necessary? + // FIXME(plat1ko): We only delete remote files when tablet is under a stable + // state: enough replicas and + // all replicas are alive. Are these conditions really sufficient or necessary? if (replicas.size() < replicaNum) { LOG.info("num replicas are not enough, tablet={}", info.tablet_id); return; @@ -669,7 +670,6 @@ public TListTableMetadataNameIdsResult listTableMetadataNameIds(TGetTablesParams } PatternMatcher finalMatcher = matcher; - ExecutorService executor = Executors.newSingleThreadExecutor(); Future future = executor.submit(() -> { @@ -1013,7 +1013,8 @@ public TMasterOpResult forward(TMasterOpRequest params) throws TException { LOG.debug("receive forwarded stmt {} from FE: {}", params.getStmtId(), params.getClientNodeHost()); } ConnectContext context = new ConnectContext(null, true); - // Set current connected FE to the client address, so that we can know where this request come from. + // Set current connected FE to the client address, so that we can know where + // this request come from. context.setCurrentConnectedFEIp(params.getClientNodeHost()); if (Config.isCloudMode() && !Strings.isNullOrEmpty(params.getCloudCluster())) { context.setCloudCluster(params.getCloudCluster()); @@ -1359,7 +1360,8 @@ private List queryLoadCommitTables(TLoadTxnCommitRequest request, Databas OlapTable table = (OlapTable) db.getTableOrMetaException(tbl, TableType.OLAP); tables.add(table); } - // if it has multi table, use multi table and update multi table running transaction table ids + // if it has multi table, use multi table and update multi table running + // transaction table ids if (CollectionUtils.isNotEmpty(request.getTbls())) { List multiTableIds = tables.stream().map(Table::getId).collect(Collectors.toList()); Env.getCurrentGlobalTransactionMgr() @@ -1555,7 +1557,8 @@ public TLoadTxnCommitResult loadTxnCommit(TLoadTxnCommitRequest request) throws return result; } - // return true if commit success and publish success, return false if publish timeout + // return true if commit success and publish success, return false if publish + // timeout private boolean loadTxnCommitImpl(TLoadTxnCommitRequest request) throws UserException { if (request.isSetAuthCode()) { // TODO(cmy): find a way to check @@ -1633,7 +1636,8 @@ public TCommitTxnResult commitTxn(TCommitTxnRequest request) throws TException { return result; } - // return true if commit success and publish success, return false if publish timeout + // return true if commit success and publish success, return false if publish + // timeout private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { /// Check required arg: user, passwd, db, txn_id, commit_infos if (!request.isSetUser()) { @@ -1698,7 +1702,6 @@ private boolean commitTxnImpl(TCommitTxnRequest request) throws UserException { // Step 4: get timeout long timeoutMs = request.isSetThriftRpcTimeoutMs() ? request.getThriftRpcTimeoutMs() / 2 : 5000; - // Step 5: commit and publish return Env.getCurrentGlobalTransactionMgr() .commitAndPublishTransaction(db, tableList, @@ -1949,7 +1952,8 @@ public TStreamLoadPutResult streamLoadPut(TStreamLoadPutRequest request) { } /** - * For first-class multi-table scenarios, we should store the mapping between Txn and data source type in a common + * For first-class multi-table scenarios, we should store the mapping between + * Txn and data source type in a common * place. Since there is only Kafka now, we should do this first. */ private void buildMultiTableStreamLoadTask(StreamLoadTask baseTaskInfo, long txnId) { @@ -2002,7 +2006,7 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ ctx.setCurrentUserIdentity(currentUser.get(0)); } LOG.info("one stream multi table load use cloud cluster {}", request.getCloudCluster()); - //ctx.setCloudCluster(); + // ctx.setCloudCluster(); if (!Strings.isNullOrEmpty(request.getCloudCluster())) { if (Strings.isNullOrEmpty(request.getUser())) { ctx.setCloudCluster(request.getCloudCluster()); @@ -2055,7 +2059,7 @@ public TStreamLoadMultiTablePutResult streamLoadMultiTablePut(TStreamLoadPutRequ RoutineLoadJob routineLoadJob = Env.getCurrentEnv().getRoutineLoadManager() .getRoutineLoadJobByMultiLoadTaskTxnId(request.getTxnId()); routineLoadJob.updateState(JobState.PAUSED, new ErrorReason(InternalErrorCode.CANNOT_RESUME_ERR, - "failed to get stream load plan, " + exception.getMessage()), false); + "failed to get stream load plan, " + exception.getMessage()), false); } catch (UserException e) { LOG.warn("catch update routine load job error.", e); } @@ -2112,9 +2116,9 @@ private HttpStreamParams initHttpStreamPlan(TStreamLoadPutRequest request, Conne coord.setQueryType(TQueryType.LOAD); TableIf table = httpStreamParams.getTable(); if (table instanceof OlapTable) { - boolean isEnableMemtableOnSinkNode = - ((OlapTable) table).getTableProperty().getUseSchemaLightChange() - ? coord.getQueryOptions().isEnableMemtableOnSinkNode() : false; + boolean isEnableMemtableOnSinkNode = ((OlapTable) table).getTableProperty().getUseSchemaLightChange() + ? coord.getQueryOptions().isEnableMemtableOnSinkNode() + : false; coord.getQueryOptions().setEnableMemtableOnSinkNode(isEnableMemtableOnSinkNode); } httpStreamParams.setParams(coord.getStreamLoadPlan()); @@ -2239,7 +2243,7 @@ private TExecPlanFragmentParams streamLoadPutImpl(TStreamLoadPutRequest request, try { if (!((OlapTable) table).getTableProperty().getUseSchemaLightChange() && (request.getGroupCommitMode() != null - && !request.getGroupCommitMode().equals("off_mode"))) { + && !request.getGroupCommitMode().equals("off_mode"))) { throw new UserException( "table light_schema_change is false, can't do stream load with group commit mode"); } @@ -2906,7 +2910,6 @@ private TGetBinlogResult getBinlogImpl(TGetBinlogRequest request, String clientI throw new UserException("prev_commit_seq is not set"); } - // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), @@ -2999,7 +3002,8 @@ public TGetSnapshotResult getSnapshot(TGetSnapshotRequest request) throws TExcep // getSnapshotImpl private TGetSnapshotResult getSnapshotImpl(TGetSnapshotRequest request, String clientIp) throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, snapshot_type + // Step 1: Check all required arg: user, passwd, db, label_name, snapshot_name, + // snapshot_type if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -3080,7 +3084,8 @@ public TRestoreSnapshotResult restoreSnapshot(TRestoreSnapshotRequest request) t // restoreSnapshotImpl private TRestoreSnapshotResult restoreSnapshotImpl(TRestoreSnapshotRequest request, String clientIp) throws UserException { - // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, meta, info + // Step 1: Check all required arg: user, passwd, db, label_name, repo_name, + // meta, info if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -3347,7 +3352,6 @@ private TGetBinlogLagResult getBinlogLagImpl(TGetBinlogRequest request, String c throw new UserException("prev_commit_seq is not set"); } - // step 1: check auth if (Strings.isNullOrEmpty(request.getToken())) { checkSingleTablePasswordAndPrivs(request.getUser(), request.getPasswd(), request.getDb(), @@ -3642,8 +3646,10 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request List allReqPartNames; // all request partitions try { taskLock.lock(); - // we dont lock the table. other thread in this txn will be controled by taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new partition directly. + // we dont lock the table. other thread in this txn will be controled by + // taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new + // partition directly. // if not by this txn, just let it fail naturally is ok. List replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. @@ -3653,7 +3659,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced .mapToObj(partitionIds::get) .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by others). + // from here we ONLY deal the pending partitions. not include the dealed(by + // others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. List pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); @@ -3664,7 +3671,8 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request overwriteManager.registerTaskInGroup(taskGroupId, taskId); InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and record them. + // now temp partitions are bumped up and use new names. we get their ids and + // record them. List newPartitionIds = new ArrayList(); for (String newPartName : pendingPartitionNames) { newPartitionIds.add(olapTable.getPartition(newPartName).getId()); @@ -3686,8 +3694,10 @@ public TReplacePartitionResult replacePartition(TReplacePartitionRequest request taskLock.unlock(); } - // build partition & tablets. now all partitions in allReqPartNames are replaced an recorded. - // so they won't be changed again. if other transaction changing it. just let it fail. + // build partition & tablets. now all partitions in allReqPartNames are replaced + // an recorded. + // so they won't be changed again. if other transaction changing it. just let it + // fail. List partitions = Lists.newArrayList(); List tablets = Lists.newArrayList(); PartitionInfo partitionInfo = olapTable.getPartitionInfo(); @@ -3796,7 +3806,7 @@ public TGetMetaResult getMeta(TGetMetaRequest request) throws TException { private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp) throws Exception { - // Step 1: check fields + // Step 1: check fields if (!request.isSetUser()) { throw new UserException("user is not set"); } @@ -3869,7 +3879,6 @@ private TGetMetaResult getMetaImpl(TGetMetaRequest request, String clientIp) } } - @Override public TGetColumnInfoResult getColumnInfo(TGetColumnInfoRequest request) { TGetColumnInfoResult result = new TGetColumnInfoResult(); @@ -3990,12 +3999,12 @@ public TStatus reportCommitTxnResult(TReportCommitTxnResultRequest request) thro // FE only has one master, this should not be a problem if (!Env.getCurrentEnv().isMaster()) { LOG.error("failed to handle load stats report: not master, backend:{}", - clientAddr); + clientAddr); return new TStatus(TStatusCode.NOT_MASTER); } LOG.info("receive load stats report request: {}, backend: {}, dbId: {}, txnId: {}, label: {}", - request, clientAddr, request.getDbId(), request.getTxnId(), request.getLabel()); + request, clientAddr, request.getDbId(), request.getTxnId(), request.getLabel()); try { byte[] receivedProtobufBytes = request.getPayload(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java index c84939ffbe63ae..7908bb1bb4b890 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataGenerator.java @@ -19,6 +19,7 @@ import org.apache.doris.analysis.UserIdentity; import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.DatabaseIf; import org.apache.doris.catalog.Env; import org.apache.doris.catalog.MTMV; import org.apache.doris.catalog.SchemaTable; @@ -38,6 +39,9 @@ import org.apache.doris.job.task.AbstractTask; import org.apache.doris.mtmv.MTMVPartitionUtil; import org.apache.doris.mysql.privilege.PrivPredicate; +import org.apache.doris.plsql.metastore.PlsqlManager; +import org.apache.doris.plsql.metastore.PlsqlProcedureKey; +import org.apache.doris.plsql.metastore.PlsqlStoredProcedure; import org.apache.doris.qe.ConnectContext; import org.apache.doris.qe.QeProcessorImpl; import org.apache.doris.qe.QeProcessorImpl.QueryInfo; @@ -92,6 +96,8 @@ public class MetadataGenerator { private static final ImmutableMap WORKLOAD_GROUPS_COLUMN_TO_INDEX; + private static final ImmutableMap ROUTINE_INFO_COLUMN_TO_INDEX; + static { ImmutableMap.Builder activeQueriesbuilder = new ImmutableMap.Builder(); List activeQueriesColList = SchemaTable.TABLE_MAP.get("active_queries").getFullSchema(); @@ -105,6 +111,12 @@ public class MetadataGenerator { workloadGroupBuilder.put(WorkloadGroupMgr.WORKLOAD_GROUP_PROC_NODE_TITLE_NAMES.get(i).toLowerCase(), i); } WORKLOAD_GROUPS_COLUMN_TO_INDEX = workloadGroupBuilder.build(); + + ImmutableMap.Builder routineInfoBuilder = new ImmutableMap.Builder(); + for (int i = 0; i < PlsqlManager.ROUTINE_INFO_TITLE_NAMES.size(); i++) { + routineInfoBuilder.put(PlsqlManager.ROUTINE_INFO_TITLE_NAMES.get(i).toLowerCase(), i); + } + ROUTINE_INFO_COLUMN_TO_INDEX = routineInfoBuilder.build(); } public static TFetchSchemaTableDataResult getMetadataTable(TFetchSchemaTableDataRequest request) throws TException { @@ -167,6 +179,10 @@ public static TFetchSchemaTableDataResult getSchemaTableData(TFetchSchemaTableDa result = workloadGroupsMetadataResult(schemaTableParams); columnIndex = WORKLOAD_GROUPS_COLUMN_TO_INDEX; break; + case ROUTINES_INFO: + result = routineInfoMetadataResult(schemaTableParams); + columnIndex = ROUTINE_INFO_COLUMN_TO_INDEX; + break; default: return errorResult("invalid schema table name."); } @@ -189,7 +205,7 @@ private static TFetchSchemaTableDataResult icebergMetadataResult(TMetadataTableR return errorResult("Iceberg metadata params is not set."); } - TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); + TIcebergMetadataParams icebergMetadataParams = params.getIcebergMetadataParams(); TIcebergQueryType icebergQueryType = icebergMetadataParams.getIcebergQueryType(); IcebergMetadataCache icebergMetadataCache = Env.getCurrentEnv().getExtMetaCacheMgr().getIcebergMetadataCache(); List dataBatch = Lists.newArrayList(); @@ -376,7 +392,7 @@ private static TFetchSchemaTableDataResult frontendsDisksMetadataResult(TMetadat private static TFetchSchemaTableDataResult catalogsMetadataResult(TMetadataTableRequestParams params) { TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); - List info = Env.getCurrentEnv().getCatalogMgr().listCatalogs(); + List info = Env.getCurrentEnv().getCatalogMgr().listCatalogs(); List dataBatch = Lists.newArrayList(); for (CatalogIf catalog : info) { @@ -417,15 +433,15 @@ private static TFetchSchemaTableDataResult workloadGroupsMetadataResult(TSchemaT List dataBatch = Lists.newArrayList(); for (List rGroupsInfo : workloadGroupsInfo) { TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(1))); // name trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(2)))); // cpu_share - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(3))); // mem_limit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(4))); // mem overcommit trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(5)))); // max concurrent trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(6)))); // max queue size trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(7)))); // queue timeout - trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit + trow.addToColumnValue(new TCell().setStringVal(rGroupsInfo.get(8))); // cpu hard limit trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(9)))); // scan thread num // max remote scan thread num trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(rGroupsInfo.get(10)))); @@ -455,11 +471,11 @@ private static TFetchSchemaTableDataResult workloadSchedPolicyMetadataResult(TMe List dataBatch = Lists.newArrayList(); for (List policyRow : workloadPolicyList) { TRow trow = new TRow(); - trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition - trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action - trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority + trow.addToColumnValue(new TCell().setLongVal(Long.valueOf(policyRow.get(0)))); // id + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(1))); // name + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(2))); // condition + trow.addToColumnValue(new TCell().setStringVal(policyRow.get(3))); // action + trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(4)))); // priority trow.addToColumnValue(new TCell().setBoolVal(Boolean.valueOf(policyRow.get(5)))); // enabled trow.addToColumnValue(new TCell().setIntVal(Integer.valueOf(policyRow.get(6)))); // version dataBatch.add(trow); @@ -552,7 +568,7 @@ private static TFetchSchemaTableDataResult queriesMetadataResult(TSchemaTableReq List relayResults = forwardToOtherFrontends(replayFetchSchemaTableReq); relayResults .forEach(rs -> rs.getDataBatch() - .forEach(row -> dataBatch.add(row))); + .forEach(row -> dataBatch.add(row))); } result.setDataBatch(dataBatch); @@ -753,4 +769,58 @@ private static TFetchSchemaTableDataResult taskMetadataResult(TMetadataTableRequ result.setStatus(new TStatus(TStatusCode.OK)); return result; } + + private static TFetchSchemaTableDataResult routineInfoMetadataResult(TSchemaTableRequestParams params) { + if (!params.isSetCurrentUserIdent()) { + return errorResult("current user ident is not set."); + } + + PlsqlManager plSqlClient = Env.getCurrentEnv().getPlsqlManager(); + + TFetchSchemaTableDataResult result = new TFetchSchemaTableDataResult(); + List dataBatch = Lists.newArrayList(); + + Map allProc = plSqlClient.getAllPlsqlStoredProcedures(); + for (Map.Entry entry : allProc.entrySet()) { + PlsqlStoredProcedure proc = entry.getValue(); + TRow trow = new TRow(); + trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // SPECIFIC_NAME + trow.addToColumnValue(new TCell().setStringVal(Long.toString(proc.getCatalogId()))); // ROUTINE_CATALOG + CatalogIf catalog = Env.getCurrentEnv().getCatalogMgr().getCatalog(proc.getCatalogId()); + if (catalog != null) { + DatabaseIf db = catalog.getDbNullable(proc.getDbId()); + if (db != null) { + trow.addToColumnValue(new TCell().setStringVal(db.getFullName())); // ROUTINE_SCHEMA + } else { + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA + } + } else { + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_SCHEMA + } + trow.addToColumnValue(new TCell().setStringVal(proc.getName())); // ROUTINE_NAME + trow.addToColumnValue(new TCell().setStringVal("PROCEDURE")); // ROUTINE_TYPE + trow.addToColumnValue(new TCell().setStringVal("")); // DTD_IDENTIFIER + trow.addToColumnValue(new TCell().setStringVal(proc.getSource())); // ROUTINE_BODY + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_DEFINITION + trow.addToColumnValue(new TCell().setStringVal("NULL")); // EXTERNAL_NAME + trow.addToColumnValue(new TCell().setStringVal("")); // EXTERNAL_LANGUAGE + trow.addToColumnValue(new TCell().setStringVal("SQL")); // PARAMETER_STYLE + trow.addToColumnValue(new TCell().setStringVal("")); // IS_DETERMINISTIC + trow.addToColumnValue(new TCell().setStringVal("")); // SQL_DATA_ACCESS + trow.addToColumnValue(new TCell().setStringVal("NULL")); // SQL_PATH + trow.addToColumnValue(new TCell().setStringVal("DEFINER")); // SECURITY_TYPE + trow.addToColumnValue(new TCell().setStringVal(proc.getCreateTime())); // CREATED + trow.addToColumnValue(new TCell().setStringVal(proc.getModifyTime())); // LAST_ALTERED + trow.addToColumnValue(new TCell().setStringVal("")); // SQ_MODE + trow.addToColumnValue(new TCell().setStringVal("")); // ROUTINE_COMMENT + trow.addToColumnValue(new TCell().setStringVal(proc.getOwnerName())); // DEFINER + trow.addToColumnValue(new TCell().setStringVal("")); // CHARACTER_SET_CLIENT + trow.addToColumnValue(new TCell().setStringVal("")); // COLLATION_CONNECTION + trow.addToColumnValue(new TCell().setStringVal("")); // DATABASE_COLLATION + dataBatch.add(trow); + } + result.setDataBatch(dataBatch); + result.setStatus(new TStatus(TStatusCode.OK)); + return result; + } } diff --git a/gensrc/thrift/FrontendService.thrift b/gensrc/thrift/FrontendService.thrift index 568b174c11c496..ef90e97b41b5ec 100644 --- a/gensrc/thrift/FrontendService.thrift +++ b/gensrc/thrift/FrontendService.thrift @@ -925,6 +925,7 @@ enum TSchemaTableName { METADATA_TABLE = 1, // tvf ACTIVE_QUERIES = 2, // db information_schema's table WORKLOAD_GROUPS = 3, // db information_schema's table + ROUTINES_INFO = 4, // db information_schema's table } struct TMetadataTableRequestParams {