Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cherry pick v3.1.0 (0408-0411) #4130

Merged
merged 3 commits into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -846,9 +846,9 @@ Status MetaClient::handleResponse(const RESP& resp) {
case nebula::cpp2::ErrorCode::E_WRONGCLUSTER:
return Status::Error("Wrong cluster!");
case nebula::cpp2::ErrorCode::E_ZONE_NOT_ENOUGH:
return Status::Error("Zone not enough!");
return Status::Error("Host not enough!");
case nebula::cpp2::ErrorCode::E_ZONE_IS_EMPTY:
return Status::Error("Zone is empty!");
return Status::Error("Host not exist!");
case nebula::cpp2::ErrorCode::E_STORE_FAILURE:
return Status::Error("Store failure!");
case nebula::cpp2::ErrorCode::E_BAD_BALANCE_PLAN:
Expand Down
4 changes: 0 additions & 4 deletions src/graph/executor/admin/SpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
"Collate",
"Vid Type",
"Atomic Edge",
"Zones",
"Comment"};
Row row;
row.values.emplace_back(spaceId);
Expand All @@ -94,9 +93,6 @@ folly::Future<Status> DescSpaceExecutor::execute() {
}
row.values.emplace_back(sAtomicEdge);

auto zoneNames = folly::join(",", properties.get_zone_names());
row.values.emplace_back(zoneNames);

if (properties.comment_ref().has_value()) {
row.values.emplace_back(*properties.comment_ref());
} else {
Expand Down
6 changes: 5 additions & 1 deletion src/graph/validator/GroupByValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ Status GroupByValidator::validateImpl() {
NG_RETURN_IF_ERROR(validateYield(groupBySentence->yieldClause()));
NG_RETURN_IF_ERROR(groupClauseSemanticCheck());

for (auto* col : groupBySentence->yieldClause()->columns()) {
auto type = deduceExprType(col->expr());
outputs_.emplace_back(col->name(), std::move(type).value());
}

return Status::OK();
}

Expand Down Expand Up @@ -155,7 +160,6 @@ Status GroupByValidator::groupClauseSemanticCheck() {
for (auto i = 0u; i < groupItems_.size(); ++i) {
auto type = deduceExprType(groupItems_[i]);
NG_RETURN_IF_ERROR(type);
outputs_.emplace_back(aggOutputColNames_[i], std::move(type).value());
}
// check exprProps
if (!exprProps_.srcTagProps().empty() || !exprProps_.dstTagProps().empty()) {
Expand Down
18 changes: 8 additions & 10 deletions src/meta/processors/job/ZoneBalanceJobExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ nebula::cpp2::ErrorCode ZoneBalanceJobExecutor::prepare() {
for (size_t i = 0; i < paras_.size(); i++) {
lostZones_.emplace_back(paras_[i]);
}
std::vector<std::string> newZones;
newZones.reserve(lostZones_.size());
for (auto& name : lostZones_) {
auto host = HostAddr::fromString(name);
newZones.emplace_back(folly::stringPrintf("default_zone_%s_%d", host.host.c_str(), host.port));
}
lostZones_.swap(newZones);
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

Expand All @@ -49,16 +56,7 @@ folly::Future<Status> ZoneBalanceJobExecutor::executeInternal() {
return status;
}
}
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) {
if (status == meta::cpp2::JobStatus::FINISHED) {
folly::SharedMutex::WriteHolder holder(LockUtils::lock());
nebula::cpp2::ErrorCode ret = updateMeta();
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
status = meta::cpp2::JobStatus::FAILED;
}
}
executorOnFinished_(status);
});
plan_->setFinishCallBack([this](meta::cpp2::JobStatus status) { executorOnFinished_(status); });
plan_->invoke();
return Status::OK();
}
Expand Down
24 changes: 24 additions & 0 deletions src/meta/processors/zone/AddHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {

std::vector<kvstore::KV> data;
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
std::map<GraphSpaceID, meta::cpp2::SpaceDesc> spaceMap;
std::string spacePrefix = MetaKeyUtils::spacePrefix();
std::unique_ptr<kvstore::KVIterator> spaceIter;
auto spaceRet = kvstore_->prefix(kDefaultSpaceId, kDefaultPartId, spacePrefix, &spaceIter);
if (spaceRet != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(spaceRet);
onFinished();
return;
}
while (spaceIter->valid()) {
spaceMap.emplace(MetaKeyUtils::spaceId(spaceIter->key()),
MetaKeyUtils::parseSpace(spaceIter->val()));
spaceIter->next();
}
for (auto& host : hosts) {
// Ensure that the node is not registered.
auto machineKey = MetaKeyUtils::machineKey(host.host, host.port);
Expand All @@ -56,6 +70,16 @@ void AddHostsProcessor::process(const cpp2::AddHostsReq& req) {
auto zoneVal = MetaKeyUtils::zoneVal({host});
data.emplace_back(std::move(machineKey), "");
data.emplace_back(std::move(zoneKey), std::move(zoneVal));
for (auto& [spaceId, properties] : spaceMap) {
const std::vector<std::string>& curZones = properties.get_zone_names();
std::set<std::string> zm(curZones.begin(), curZones.end());
zm.emplace(zoneName);
std::vector<std::string> newZones(zm.begin(), zm.end());
properties.zone_names_ref() = std::move(newZones);
}
}
for (auto& [spaceId, properties] : spaceMap) {
data.emplace_back(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
}

if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
Expand Down
15 changes: 13 additions & 2 deletions src/meta/processors/zone/DropHostsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
auto spaceIterRet = doPrefix(spacePrefix);
auto spaceIter = nebula::value(spaceIterRet).get();
nebula::cpp2::ErrorCode code = nebula::cpp2::ErrorCode::SUCCEEDED;
std::map<GraphSpaceID, meta::cpp2::SpaceDesc> spaceMap;
while (spaceIter->valid()) {
auto spaceId = MetaKeyUtils::spaceId(spaceIter->key());
auto spaceKey = MetaKeyUtils::spaceKey(spaceId);
Expand All @@ -44,7 +45,7 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
<< " error: " << apache::thrift::util::enumNameSafe(code);
break;
}

spaceMap.emplace(spaceId, MetaKeyUtils::parseSpace(spaceIter->val()));
const auto& partPrefix = MetaKeyUtils::partPrefix(spaceId);
auto partIterRet = doPrefix(partPrefix);
auto partIter = nebula::value(partIterRet).get();
Expand Down Expand Up @@ -96,6 +97,14 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
}

holder->remove(MetaKeyUtils::zoneKey(zoneName));

for (auto& [spaceId, properties] : spaceMap) {
const std::vector<std::string>& curZones = properties.get_zone_names();
std::set<std::string> zm(curZones.begin(), curZones.end());
zm.erase(zoneName);
std::vector<std::string> newZones(zm.begin(), zm.end());
properties.zone_names_ref() = std::move(newZones);
}
} else {
// Delete some hosts in the zone
for (auto& h : hosts) {
Expand All @@ -112,7 +121,9 @@ void DropHostsProcessor::process(const cpp2::DropHostsReq& req) {
CHECK_CODE_AND_BREAK();
iter->next();
}

for (auto& [spaceId, properties] : spaceMap) {
holder->put(MetaKeyUtils::spaceKey(spaceId), MetaKeyUtils::spaceVal(properties));
}
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
handleErrorCode(code);
onFinished();
Expand Down
1 change: 0 additions & 1 deletion src/meta/test/BalancerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,6 @@ TEST(BalanceTest, NormalZoneTest) {
ret = balancer.executeInternal();
baton.wait();
EXPECT_EQ(Status::OK(), ret.value());
verifyMetaZone(kv, balancer.spaceInfo_.spaceId_, {"1", "2", "3", "4"});
verifyBalanceTask(
kv, balancer.jobId_, BalanceTaskStatus::END, BalanceTaskResult::SUCCEEDED, partCount, 12);
}
Expand Down
61 changes: 14 additions & 47 deletions src/parser/parser.yy
Original file line number Diff line number Diff line change
Expand Up @@ -3326,36 +3326,20 @@ admin_job_sentence
meta::cpp2::JobType::LEADER_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
HostList* hl = $7;
std::vector<HostAddr> has = hl->hosts();
for (HostAddr& ha: has) {
sentence->addPara(ha.toString());
}
delete hl;
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE {
| KW_SUBMIT KW_JOB KW_BALANCE KW_DATA {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
$$ = sentence;
}
| KW_SUBMIT KW_JOB KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list {
| KW_SUBMIT KW_JOB KW_BALANCE KW_DATA KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
ZoneNameList* nl = $7;
std::vector<std::string> names = nl->zoneNames();
for (std::string& name: names) {
sentence->addPara(name);
HostList* hl = $6;
std::vector<HostAddr> hosts = hl->hosts();
for (HostAddr& host: hosts) {
sentence->addPara(host.toString());
}
delete nl;
delete hl;
$$ = sentence;
}
;
Expand Down Expand Up @@ -3794,37 +3778,20 @@ balance_sentence
meta::cpp2::JobType::LEADER_BALANCE);
$$ = sentence;
}
|
KW_BALANCE KW_IN KW_ZONE {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
$$ = sentence;
}
| KW_BALANCE KW_IN KW_ZONE KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::DATA_BALANCE);
HostList* hl = $5;
std::vector<HostAddr> has = hl->hosts();
for (HostAddr& ha: has) {
sentence->addPara(ha.toString());
}
delete hl;
$$ = sentence;
}
| KW_BALANCE KW_ACROSS KW_ZONE {
| KW_BALANCE KW_DATA {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
$$ = sentence;
}
| KW_BALANCE KW_ACROSS KW_ZONE KW_REMOVE zone_name_list {
| KW_BALANCE KW_DATA KW_REMOVE host_list {
auto sentence = new AdminJobSentence(meta::cpp2::JobOp::ADD,
meta::cpp2::JobType::ZONE_BALANCE);
ZoneNameList* nl = $5;
std::vector<std::string> names = nl->zoneNames();
for (std::string& name: names) {
sentence->addPara(name);
HostList* hl = $4;
std::vector<HostAddr> hosts = hl->hosts();
for (HostAddr& host: hosts) {
sentence->addPara(host.toString());
}
delete nl;
delete hl;
$$ = sentence;
}
;
Expand Down
33 changes: 0 additions & 33 deletions src/parser/test/ParserTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2009,34 +2009,6 @@ TEST_F(ParserTest, ConfigOperation) {
}
}

TEST_F(ParserTest, BalanceOperation) {
{
std::string query = "BALANCE LEADER";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE ACROSS ZONE";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE REMOVE 192.168.0.1:50000,192.168.0.1:50001";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
{
std::string query = "BALANCE IN ZONE REMOVE 192.168.0.1:50000,\"localhost\":50001";
auto result = parse(query);
ASSERT_TRUE(result.ok()) << result.status();
}
}

TEST_F(ParserTest, CrashByFuzzer) {
{
std::string query = ";YIELD\nI41( ,1)GEGE.INGEST";
Expand Down Expand Up @@ -3227,11 +3199,6 @@ TEST_F(ParserTest, JobTest) {
checkTest("SUBMIT JOB INGEST", "SUBMIT JOB INGEST");

checkTest("SUBMIT JOB STATS", "SUBMIT JOB STATS");
checkTest("SUBMIT JOB BALANCE IN ZONE", "SUBMIT JOB BALANCE IN ZONE");
checkTest(
"SUBMIT JOB BALANCE IN ZONE REMOVE 192.168.0.1:50000, 192.168.0.1:50001, 192.168.0.1:50002",
"SUBMIT JOB BALANCE IN ZONE REMOVE \"192.168.0.1\":50000, \"192.168.0.1\":50001, "
"\"192.168.0.1\":50002");
checkTest("SUBMIT JOB BALANCE LEADER", "SUBMIT JOB BALANCE LEADER");
checkTest("SHOW JOBS", "SHOW JOBS");
checkTest("SHOW JOB 111", "SHOW JOB 111");
Expand Down
15 changes: 15 additions & 0 deletions tests/tck/features/aggregate/Agg.feature
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,21 @@ Feature: Basic Aggregate and GroupBy
| sum |
| 6 |

Scenario: Reference the output of group by
When executing query:
"""
GO FROM "Tim Duncan" OVER * YIELD dst(edge) as dst, $$.player.age as age
| GROUP BY $-.dst YIELD (sum($-.age)+3) as age
| ORDER BY $-.age
"""
Then the result should be, in order:
| age |
| 3 |
| 34 |
| 36 |
| 75 |
| 85 |

Scenario: Error Check
When executing query:
"""
Expand Down
12 changes: 6 additions & 6 deletions tests/tck/features/schema/Comment.feature
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ Feature: Schema Comment
DESC SPACE <space_name>;
"""
Then the result should be, in any order:
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Zones | Comment |
| /\d+/ | "<space_name>" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | /^.+?\d$/ | "<space_comment>" |
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Comment |
| /\d+/ | "<space_name>" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | "<space_comment>" |
When executing query:
"""
DROP SPACE <space_name>;
Expand Down Expand Up @@ -54,8 +54,8 @@ Feature: Schema Comment
DESC SPACE test_comment_not_set;
"""
Then the result should be, in any order:
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Zones | Comment |
| /\d+/ | "test_comment_not_set" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | /^.+?\d$/ | EMPTY |
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Comment |
| /\d+/ | "test_comment_not_set" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | EMPTY |
When executing query:
"""
DROP SPACE test_comment_not_set;
Expand All @@ -81,8 +81,8 @@ Feature: Schema Comment
DESC SPACE test_comment_empty;
"""
Then the result should be, in any order:
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Zones | Comment |
| /\d+/ | "test_comment_empty" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | /^.+?\d$/ | "" |
| ID | Name | Partition Number | Replica Factor | Charset | Collate | Vid Type | Atomic Edge | Comment |
| /\d+/ | "test_comment_empty" | 100 | 1 | "utf8" | "utf8_bin" | "FIXED_STRING(8)" | false | "" |
When executing query:
"""
DROP SPACE test_comment_empty;
Expand Down