-
Notifications
You must be signed in to change notification settings - Fork 1.2k
/
ShortestPathBase.cpp
197 lines (186 loc) · 8.09 KB
/
ShortestPathBase.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright (c) 2022 vesoft inc. All rights reserved.
//
// This source code is licensed under Apache 2.0 License.
#include "graph/executor/algo/ShortestPathBase.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/SchemaUtil.h"
#include "graph/util/Utils.h"
using apache::thrift::optional_field_ref;
using nebula::graph::util::collectRespProfileData;
using nebula::storage::StorageClient;
namespace nebula {
namespace graph {
folly::Future<std::vector<Value>> ShortestPathBase::getMeetVidsProps(
const std::vector<Value>& meetVids) {
nebula::DataSet vertices({kVid});
vertices.rows.reserve(meetVids.size());
for (auto& vid : meetVids) {
vertices.emplace_back(Row({vid}));
}
time::Duration getPropsTime;
StorageClient* storageClient = qctx_->getStorageClient();
StorageClient::CommonRequestParam param(pathNode_->space(),
qctx_->rctx()->session()->id(),
qctx_->plan()->id(),
qctx_->plan()->isProfileEnabled());
return DCHECK_NOTNULL(storageClient)
->getProps(param,
std::move(vertices),
pathNode_->vertexProps(),
nullptr,
nullptr,
false,
{},
-1,
nullptr)
.via(qctx_->rctx()->runner())
.thenValue([this, getPropsTime](PropRpcResponse&& resp) {
addStats(resp, getPropsTime.elapsedInUSec());
return handlePropResp(std::move(resp));
});
}
std::vector<Value> ShortestPathBase::handlePropResp(PropRpcResponse&& resps) {
std::vector<Value> vertices;
auto result = handleCompleteness(resps, FLAGS_accept_partial_success);
if (!result.ok()) {
LOG(WARNING) << "GetProp partial fail";
return vertices;
}
nebula::DataSet v;
for (auto& resp : resps.responses()) {
if (resp.props_ref().has_value()) {
if (UNLIKELY(!v.append(std::move(*resp.props_ref())))) {
// it's impossible according to the interface
LOG(WARNING) << "Heterogeneous props dataset";
}
} else {
LOG(WARNING) << "GetProp partial success";
}
}
auto val = std::make_shared<Value>(std::move(v));
auto iter = std::make_unique<PropIter>(val);
vertices.reserve(iter->size());
for (; iter->valid(); iter->next()) {
vertices.emplace_back(iter->getVertex());
}
return vertices;
}
Status ShortestPathBase::handleErrorCode(nebula::cpp2::ErrorCode code, PartitionID partId) const {
switch (code) {
case nebula::cpp2::ErrorCode::E_KEY_NOT_FOUND:
return Status::Error("Storage Error: Vertex or edge not found.");
case nebula::cpp2::ErrorCode::E_DATA_TYPE_MISMATCH: {
std::string error =
"Storage Error: The data type does not meet the requirements. "
"Use the correct type of data.";
return Status::Error(std::move(error));
}
case nebula::cpp2::ErrorCode::E_INVALID_VID: {
std::string error =
"Storage Error: The VID must be a 64-bit integer"
" or a string fitting space vertex id length limit.";
return Status::Error(std::move(error));
}
case nebula::cpp2::ErrorCode::E_INVALID_FIELD_VALUE: {
std::string error =
"Storage Error: Invalid field value: "
"may be the filed is not NULL "
"or without default value or wrong schema.";
return Status::Error(std::move(error));
}
case nebula::cpp2::ErrorCode::E_LEADER_CHANGED:
return Status::Error(
folly::sformat("Storage Error: Not the leader of {}. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_INVALID_FILTER:
return Status::Error("Storage Error: Invalid filter.");
case nebula::cpp2::ErrorCode::E_INVALID_UPDATER:
return Status::Error("Storage Error: Invalid Update col or yield col.");
case nebula::cpp2::ErrorCode::E_INVALID_SPACEVIDLEN:
return Status::Error("Storage Error: Invalid space vid len.");
case nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND:
return Status::Error("Storage Error: Space not found.");
case nebula::cpp2::ErrorCode::E_PART_NOT_FOUND:
return Status::Error(folly::sformat("Storage Error: Part {} not found.", partId));
case nebula::cpp2::ErrorCode::E_TAG_NOT_FOUND:
return Status::Error("Storage Error: Tag not found.");
case nebula::cpp2::ErrorCode::E_TAG_PROP_NOT_FOUND:
return Status::Error("Storage Error: Tag prop not found.");
case nebula::cpp2::ErrorCode::E_EDGE_NOT_FOUND:
return Status::Error("Storage Error: Edge not found.");
case nebula::cpp2::ErrorCode::E_EDGE_PROP_NOT_FOUND:
return Status::Error("Storage Error: Edge prop not found.");
case nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND:
return Status::Error("Storage Error: Index not found.");
case nebula::cpp2::ErrorCode::E_INVALID_DATA:
return Status::Error("Storage Error: Invalid data, may be wrong value type.");
case nebula::cpp2::ErrorCode::E_NOT_NULLABLE:
return Status::Error("Storage Error: The not null field cannot be null.");
case nebula::cpp2::ErrorCode::E_FIELD_UNSET:
return Status::Error(
"Storage Error: "
"The not null field doesn't have a default value.");
case nebula::cpp2::ErrorCode::E_OUT_OF_RANGE:
return Status::Error("Storage Error: Out of range value.");
case nebula::cpp2::ErrorCode::E_DATA_CONFLICT_ERROR:
return Status::Error(
"Storage Error: More than one request trying to "
"add/update/delete one edge/vertex at the same time.");
case nebula::cpp2::ErrorCode::E_FILTER_OUT:
return Status::OK();
case nebula::cpp2::ErrorCode::E_RAFT_TERM_OUT_OF_DATE:
return Status::Error(folly::sformat(
"Storage Error: Term of part {} is out of date. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_WAL_FAIL:
return Status::Error("Storage Error: Write wal failed. Probably disk is almost full.");
case nebula::cpp2::ErrorCode::E_RAFT_WRITE_BLOCKED:
return Status::Error(
"Storage Error: Write is blocked when creating snapshot. Please retry later.");
case nebula::cpp2::ErrorCode::E_RAFT_BUFFER_OVERFLOW:
return Status::Error(folly::sformat(
"Storage Error: Part {} raft buffer is full. Please retry later.", partId));
case nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED:
return Status::Error("Storage Error: Atomic operation failed.");
default:
auto status = Status::Error("Storage Error: part: %d, error: %s(%d).",
partId,
apache::thrift::util::enumNameSafe(code).c_str(),
static_cast<int32_t>(code));
LOG(ERROR) << status;
return status;
}
return Status::OK();
}
void ShortestPathBase::addStats(RpcResponse& resp,
size_t stepNum,
int64_t timeInUSec,
bool reverse) const {
folly::dynamic stats = folly::dynamic::array();
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
size_t size = 0u;
auto& result = resp.responses()[i];
if (result.vertices_ref().has_value()) {
size = (*result.vertices_ref()).size();
}
auto info = util::collectRespProfileData(result.result, hostLatency[i], size, timeInUSec);
stats.push_back(std::move(info));
}
auto key = folly::sformat("{}step[{}]", reverse ? "reverse " : "", stepNum);
statsLock_.lock();
stats_->emplace(key, folly::toPrettyJson(stats));
statsLock_.unlock();
}
void ShortestPathBase::addStats(PropRpcResponse& resp, int64_t timeInUSec) const {
folly::dynamic stats = folly::dynamic::array();
auto& hostLatency = resp.hostLatency();
for (size_t i = 0; i < hostLatency.size(); ++i) {
const auto& result = resp.responses()[i].get_result();
auto info = util::collectRespProfileData(result, hostLatency[i], 0, timeInUSec);
stats.push_back(std::move(info));
}
statsLock_.lock();
stats_->emplace("get_prop", folly::toPrettyJson(stats));
statsLock_.unlock();
}
} // namespace graph
} // namespace nebula