Skip to content

Commit

Permalink
enhance: add Compact, CreateResourceGroup, DropResourceGroup, Describ…
Browse files Browse the repository at this point in the history
…eResourceGroup, ListResourceGroups and UpdateResourceGroup

Signed-off-by: Ruichen Bao <ruichen.bao@zju.edu.cn>
  • Loading branch information
brcarry committed Jan 20, 2025
1 parent ed639db commit a88cdf9
Show file tree
Hide file tree
Showing 11 changed files with 473 additions and 0 deletions.
182 changes: 182 additions & 0 deletions src/impl/MilvusClientImplV2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1580,6 +1580,165 @@ Status MilvusClientImplV2::RevokePrivilegeV2(const std::string& role_name, const
pre, &MilvusConnection::OperatePrivilegeV2, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::CreateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) {
auto pre = [&resource_group, &config]() {
proto::milvus::CreateResourceGroupRequest rpc_request;
rpc_request.set_resource_group(resource_group);

auto* rg_config = rpc_request.mutable_config();
rg_config->mutable_requests()->set_node_num(config.GetRequestsNodeNum());
rg_config->mutable_limits()->set_node_num(config.GetLimitsNodeNum());

for (const auto& transfer : config.GetTransferFrom()) {
auto* transfer_from = rg_config->add_transfer_from();
transfer_from->set_resource_group(transfer);
}

for (const auto& transfer : config.GetTransferTo()) {
auto* transfer_to = rg_config->add_transfer_to();
transfer_to->set_resource_group(transfer);
}

auto* node_filter = rg_config->mutable_node_filter();
for (const auto& label : config.GetNodeLabels()) {
auto* kv_pair = node_filter->add_node_labels();
kv_pair->set_key(label.first);
kv_pair->set_value(label.second);
}

return rpc_request;
};

return apiHandler<proto::milvus::CreateResourceGroupRequest, proto::common::Status>(
pre, &MilvusConnection::CreateResourceGroup, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::DropResourceGroup(const std::string& resource_group, int timeout) {
auto pre = [&resource_group]() {
proto::milvus::DropResourceGroupRequest rpc_request;
rpc_request.set_resource_group(resource_group);
return rpc_request;
};

return apiHandler<proto::milvus::DropResourceGroupRequest, proto::common::Status>(
pre, &MilvusConnection::DropResourceGroup, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::DescribeResourceGroup(const std::string& resource_group, ResourceGroupDesc& resource_group_desc, int timeout) {
auto pre = [&resource_group]() {
proto::milvus::DescribeResourceGroupRequest rpc_request;
rpc_request.set_resource_group(resource_group);
return rpc_request;
};

auto post = [&resource_group_desc](const proto::milvus::DescribeResourceGroupResponse& response) {
if (response.status().code() != 0) {
return;
}
const auto& rg = response.resource_group();

ResourceGroupConfig config;
config.SetRequestsNodeNum(rg.config().requests().node_num());
config.SetLimitsNodeNum(rg.config().limits().node_num());

std::vector<std::string> transfer_from;
for (const auto& transfer : rg.config().transfer_from()) {
transfer_from.push_back(transfer.resource_group());
}
config.SetTransferFrom(transfer_from);

std::vector<std::string> transfer_to;
for (const auto& transfer : rg.config().transfer_to()) {
transfer_to.push_back(transfer.resource_group());
}
config.SetTransferTo(transfer_to);

std::vector<std::pair<std::string, std::string>> node_labels;
for (const auto& label : rg.config().node_filter().node_labels()) {
node_labels.emplace_back(label.key(), label.value());
}
config.SetNodeLabels(node_labels);

std::vector<NodeInfo> nodes;
for (const auto& node : rg.nodes()) {
nodes.emplace_back(node.node_id(), node.address(), node.hostname());
}

resource_group_desc = ResourceGroupDesc(
rg.name(),
rg.capacity(),
rg.num_available_node(),
std::map<std::string, int32_t>(rg.num_loaded_replica().begin(), rg.num_loaded_replica().end()),
std::map<std::string, int32_t>(rg.num_outgoing_node().begin(), rg.num_outgoing_node().end()),
std::map<std::string, int32_t>(rg.num_incoming_node().begin(), rg.num_incoming_node().end()),
config,
nodes
);
};

return apiHandler<proto::milvus::DescribeResourceGroupRequest, proto::milvus::DescribeResourceGroupResponse>(
pre, &MilvusConnection::DescribeResourceGroup, post, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::ListResourceGroups(std::vector<std::string>& resource_groups, int timeout) {
auto pre = []() {
proto::milvus::ListResourceGroupsRequest rpc_request;
return rpc_request;
};

auto post = [&resource_groups](const proto::milvus::ListResourceGroupsResponse& response) {
resource_groups.clear();
if (response.status().code() != 0) {
return;
}
for (const auto& group : response.resource_groups()) {
resource_groups.push_back(group);
}
};

return apiHandler<proto::milvus::ListResourceGroupsRequest, proto::milvus::ListResourceGroupsResponse>(
pre, &MilvusConnection::ListResourceGroups, post, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::UpdateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) {
auto pre = [&resource_group, &config]() {
proto::milvus::UpdateResourceGroupsRequest rpc_request;

auto& config_map = *rpc_request.mutable_resource_groups();
auto* rg_config = &config_map[resource_group];

rg_config->mutable_requests()->set_node_num(config.GetRequestsNodeNum());
rg_config->mutable_limits()->set_node_num(config.GetLimitsNodeNum());

for (const auto& transfer : config.GetTransferFrom()) {
auto* transfer_from = rg_config->add_transfer_from();
transfer_from->set_resource_group(transfer);
}

for (const auto& transfer : config.GetTransferTo()) {
auto* transfer_to = rg_config->add_transfer_to();
transfer_to->set_resource_group(transfer);
}

auto* node_filter = rg_config->mutable_node_filter();
for (const auto& label : config.GetNodeLabels()) {
auto* kv_pair = node_filter->add_node_labels();
kv_pair->set_key(label.first);
kv_pair->set_value(label.second);
}

return rpc_request;
};

return apiHandler<proto::milvus::UpdateResourceGroupsRequest, proto::common::Status>(
pre, &MilvusConnection::UpdateResourceGroups, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::CalcDistance(const CalcDistanceArguments& arguments, DistanceArray& results) {
auto validate = [&arguments]() { return arguments.Validate(); };
Expand Down Expand Up @@ -1838,6 +1997,29 @@ MilvusClientImplV2::LoadBalance(int64_t src_node, const std::vector<int64_t>& ds
nullptr);
}

Status MilvusClientImplV2::Compact(const std::string& collection_name, int64_t& compaction_id, bool is_clustering, int timeout) {
CollectionDesc collection_desc;
auto status = DescribeCollection(collection_name, collection_desc);
if (!status.IsOk()) {
return status;
}

auto pre = [&collection_desc, &collection_name, is_clustering]() {
proto::milvus::ManualCompactionRequest rpc_request;
rpc_request.set_collectionid(collection_desc.ID());
rpc_request.set_collection_name(collection_name);
rpc_request.set_majorcompaction(is_clustering);
return rpc_request;
};

auto post = [&compaction_id](const proto::milvus::ManualCompactionResponse& response) {
compaction_id = response.compactionid();
};

return apiHandler<proto::milvus::ManualCompactionRequest, proto::milvus::ManualCompactionResponse>(
pre, &MilvusConnection::ManualCompaction, post, GrpcOpts{timeout});
}

Status
MilvusClientImplV2::GetCompactionState(int64_t compaction_id, CompactionState& compaction_state) {
auto pre = [&compaction_id]() {
Expand Down
18 changes: 18 additions & 0 deletions src/impl/MilvusClientImplV2.h
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,21 @@ class MilvusClientImplV2 : public MilvusClientV2 {
Status
RevokePrivilegeV2(const std::string& role_name, const std::string& privilege, const std::string& collection_name, const std::string& db_name, int timeout) final;

Status
CreateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) final;

Status
DropResourceGroup(const std::string& resource_group, int timeout) final;

Status
DescribeResourceGroup(const std::string& resource_group, ResourceGroupDesc& resource_group_desc, int timeout = 0) final;

Status
ListResourceGroups(std::vector<std::string>& resource_groups, int timeout) final;

Status
UpdateResourceGroup(const std::string& resource_group, const ResourceGroupConfig& config, int timeout) final;

Status
CalcDistance(const CalcDistanceArguments& arguments, DistanceArray& results) final;

Expand All @@ -276,6 +291,9 @@ class MilvusClientImplV2 : public MilvusClientV2 {
Status
LoadBalance(int64_t src_node, const std::vector<int64_t>& dst_nodes, const std::vector<int64_t>& segments) final;

Status
Compact(const std::string& collection_name, int64_t& compaction_id, bool is_clustering, int timeout) final;

Status
GetCompactionState(int64_t compaction_id, CompactionState& compaction_state) final;

Expand Down
25 changes: 25 additions & 0 deletions src/impl/MilvusConnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,31 @@ MilvusConnection::OperatePrivilegeV2(const proto::milvus::OperatePrivilegeV2Requ
return grpcCall("OperatePrivilegeV2", &Stub::OperatePrivilegeV2, request, response, options);
}

Status
MilvusConnection::CreateResourceGroup(const proto::milvus::CreateResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options) {
return grpcCall("CreateResourceGroup", &Stub::CreateResourceGroup, request, response, options);
}

Status
MilvusConnection::DropResourceGroup(const proto::milvus::DropResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options) {
return grpcCall("DropResourceGroup", &Stub::DropResourceGroup, request, response, options);
}

Status
MilvusConnection::DescribeResourceGroup(const proto::milvus::DescribeResourceGroupRequest& request, proto::milvus::DescribeResourceGroupResponse& response, const GrpcContextOptions& options) {
return grpcCall("DescribeResourceGroup", &Stub::DescribeResourceGroup, request, response, options);
}

Status
MilvusConnection::ListResourceGroups(const proto::milvus::ListResourceGroupsRequest& request, proto::milvus::ListResourceGroupsResponse& response, const GrpcContextOptions& options) {
return grpcCall("ListResourceGroups", &Stub::ListResourceGroups, request, response, options);
}

Status
MilvusConnection::UpdateResourceGroups(const proto::milvus::UpdateResourceGroupsRequest& request, proto::common::Status& response, const GrpcContextOptions& options) {
return grpcCall("UpdateResourceGroups", &Stub::UpdateResourceGroups, request, response, options);
}

void
MilvusConnection::SetHeader(const std::string& key, const std::string& value) {
headers_[key] = value;
Expand Down
15 changes: 15 additions & 0 deletions src/impl/MilvusConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ class MilvusConnection {
Status
OperatePrivilegeV2(const proto::milvus::OperatePrivilegeV2Request& request, proto::common::Status& response, const GrpcContextOptions& options);

Status
CreateResourceGroup(const proto::milvus::CreateResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options);

Status
DropResourceGroup(const proto::milvus::DropResourceGroupRequest& request, proto::common::Status& response, const GrpcContextOptions& options);

Status
DescribeResourceGroup(const proto::milvus::DescribeResourceGroupRequest& request, proto::milvus::DescribeResourceGroupResponse& response,const GrpcContextOptions& options);

Status
ListResourceGroups(const proto::milvus::ListResourceGroupsRequest& request, proto::milvus::ListResourceGroupsResponse& response, const GrpcContextOptions& options);

Status
UpdateResourceGroups(const proto::milvus::UpdateResourceGroupsRequest& request, proto::common::Status& response, const GrpcContextOptions& options);

void
SetHeader(const std::string& key, const std::string& value);

Expand Down
16 changes: 16 additions & 0 deletions src/impl/types/NodeInfo.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#include "milvus/types/NodeInfo.h"

NodeInfo::NodeInfo(int64_t id, const std::string& addr, const std::string& host)
: node_id(id), address(addr), hostname(host) {}

int64_t NodeInfo::GetNodeId() const {
return node_id;
}

const std::string& NodeInfo::GetAddress() const {
return address;
}

const std::string& NodeInfo::GetHostname() const {
return hostname;
}
52 changes: 52 additions & 0 deletions src/impl/types/ResourceGroupConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#include "milvus/types/ResourceGroupConfig.h"

namespace milvus {

ResourceGroupConfig::ResourceGroupConfig(int req_node_num, int lim_node_num,
const std::vector<std::string>& from,
const std::vector<std::string>& to,
const std::vector<std::pair<std::string, std::string>>& labels)
: requests_node_num(req_node_num), limits_node_num(lim_node_num),
transfer_from(from), transfer_to(to), node_labels(labels) {}

int ResourceGroupConfig::GetRequestsNodeNum() const {
return requests_node_num;
}

void ResourceGroupConfig::SetRequestsNodeNum(int num) {
requests_node_num = num;
}

int ResourceGroupConfig::GetLimitsNodeNum() const {
return limits_node_num;
}

void ResourceGroupConfig::SetLimitsNodeNum(int num) {
limits_node_num = num;
}

const std::vector<std::string>& ResourceGroupConfig::GetTransferFrom() const {
return transfer_from;
}

void ResourceGroupConfig::SetTransferFrom(const std::vector<std::string>& from) {
transfer_from = from;
}

const std::vector<std::string>& ResourceGroupConfig::GetTransferTo() const {
return transfer_to;
}

void ResourceGroupConfig::SetTransferTo(const std::vector<std::string>& to) {
transfer_to = to;
}

const std::vector<std::pair<std::string, std::string>>& ResourceGroupConfig::GetNodeLabels() const {
return node_labels;
}

void ResourceGroupConfig::SetNodeLabels(const std::vector<std::pair<std::string, std::string>>& labels) {
node_labels = labels;
}

} // namespace milvus
47 changes: 47 additions & 0 deletions src/impl/types/ResourceGroupDesc.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
#include "milvus/types/ResourceGroupDesc.h"

namespace milvus {

ResourceGroupDesc::ResourceGroupDesc(const std::string& name, int32_t capacity, int32_t available_nodes,
const std::map<std::string, int32_t>& loaded_replicas,
const std::map<std::string, int32_t>& outgoing_nodes,
const std::map<std::string, int32_t>& incoming_nodes,
const ResourceGroupConfig& config,
const std::vector<NodeInfo>& nodes)
: name(name), capacity(capacity), num_available_node(available_nodes),
num_loaded_replica(loaded_replicas), num_outgoing_node(outgoing_nodes),
num_incoming_node(incoming_nodes), config(config), nodes(nodes) {}

const std::string& ResourceGroupDesc::GetName() const {
return name;
}

int32_t ResourceGroupDesc::GetCapacity() const {
return capacity;
}

int32_t ResourceGroupDesc::GetNumAvailableNode() const {
return num_available_node;
}

const std::map<std::string, int32_t>& ResourceGroupDesc::GetNumLoadedReplica() const {
return num_loaded_replica;
}

const std::map<std::string, int32_t>& ResourceGroupDesc::GetNumOutgoingNode() const {
return num_outgoing_node;
}

const std::map<std::string, int32_t>& ResourceGroupDesc::GetNumIncomingNode() const {
return num_incoming_node;
}

const ResourceGroupConfig& ResourceGroupDesc::GetConfig() const {
return config;
}

const std::vector<NodeInfo>& ResourceGroupDesc::GetNodes() const {
return nodes;
}

} // namespace milvus
Loading

0 comments on commit a88cdf9

Please sign in to comment.