From a5bed334f37ffb2b303242f7426a86d239434464 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Tue, 27 Sep 2022 14:01:22 +0800 Subject: [PATCH 1/4] only write vertex key when flag is set or explictly insert vertex (#4680) --- src/storage/mutate/AddVerticesProcessor.cpp | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 0709062fb8e..bd25e77ffc0 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -62,6 +62,7 @@ void AddVerticesProcessor::process(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); + bool onlyVertex = propNamesMap.empty(); for (auto& part : partVertices) { auto partId = part.first; const auto& vertices = part.second; @@ -81,7 +82,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (FLAGS_use_vertex_key) { + if (onlyVertex || FLAGS_use_vertex_key) { data.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), ""); } for (auto& newTag : newTags) { @@ -139,6 +140,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& req) { const auto& partVertices = req.get_parts(); const auto& propNamesMap = req.get_prop_names(); + bool onlyVertex = propNamesMap.empty(); for (const auto& part : partVertices) { auto partId = part.first; @@ -161,7 +163,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (FLAGS_use_vertex_key) { + if (onlyVertex || FLAGS_use_vertex_key) { verticeData.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid)); } for (const auto& newTag : newTags) { From 69a6d87e423dc2a754af04b4e54ad91b95c0ca7b Mon Sep 17 00:00:00 2001 From: panda-sheep <59197347+panda-sheep@users.noreply.github.com> Date: Tue, 27 Sep 2022 15:04:43 +0800 Subject: [PATCH 2/4] fix job manager ut in asan (#4678) * fix job manager ut in asan * fix SuperYoko's comments * address comments Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com> --- src/meta/test/JobManagerTest.cpp | 65 ++++++++++++++++---------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/src/meta/test/JobManagerTest.cpp b/src/meta/test/JobManagerTest.cpp index 022b0085d03..b80c6ad7ccd 100644 --- a/src/meta/test/JobManagerTest.cpp +++ b/src/meta/test/JobManagerTest.cpp @@ -543,14 +543,16 @@ TEST_F(JobManagerTest, ShowJobInOtherSpace) { } TEST_F(JobManagerTest, RecoverJob) { + std::unique_ptr> jobMgr = getJobManager(); + // set status to prevent running the job since AdminClient is a injector + jobMgr->status_.store(JobManager::JbmgrStatus::STOPPED, std::memory_order_release); + jobMgr->bgThread_.join(); + GraphSpaceID spaceId = 1; + int32_t nJob = 3; + int32_t base = 0; + // case 1,recover Queue status job { - std::unique_ptr> jobMgr = getJobManager(); - // set status to prevent running the job since AdminClient is a injector - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); - GraphSpaceID spaceId = 1; - int32_t nJob = 3; for (auto jobId = 1; jobId <= nJob; ++jobId) { JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH); jd.setStatus(cpp2::JobStatus::STOPPED); @@ -589,19 +591,18 @@ TEST_F(JobManagerTest, RecoverJob) { } auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); ASSERT_EQ(nebula::value(nJobRecovered), 3); + std::tuple opJobId; + while (jobMgr->jobSize() != 0) { + jobMgr->tryDequeue(opJobId); + } } // case 2 // For the balance job, if there are stopped jobs and failed jobs in turn // only recover the last balance job + base = 10; { - std::unique_ptr> jobMgr = getJobManager(); - // set status to prevent running the job since AdminClient is a injector - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); - GraphSpaceID spaceId = 1; - int32_t nJob = 3; - for (auto jobId = 1; jobId <= nJob; ++jobId) { + for (auto jobId = base + 1; jobId <= base + nJob; ++jobId) { cpp2::JobStatus jobStatus = cpp2::JobStatus::STOPPED; JobDescription jd(spaceId, jobId, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); @@ -613,9 +614,9 @@ TEST_F(JobManagerTest, RecoverJob) { jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) { + for (auto jobId = base + nJob + 1; jobId <= base + nJob + 3; ++jobId) { cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; - JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus); + JobDescription jd(spaceId, jobId, cpp2::JobType::STATS, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -627,7 +628,7 @@ TEST_F(JobManagerTest, RecoverJob) { } { cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; - JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); + JobDescription jd(spaceId, base + nJob + 4, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -637,28 +638,28 @@ TEST_F(JobManagerTest, RecoverJob) { jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); + auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1}); ASSERT_EQ(nebula::value(nJobRecovered), 0); - nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {2}); + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 2}); ASSERT_EQ(nebula::value(nJobRecovered), 0); - nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {3}); + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 3}); ASSERT_EQ(nebula::value(nJobRecovered), 0); - nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {7}); + nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + nJob + 4}); ASSERT_EQ(nebula::value(nJobRecovered), 1); + + std::tuple opJobId; + while (jobMgr->jobSize() != 0) { + jobMgr->tryDequeue(opJobId); + } } // case 3 // For the balance jobs, if there is a newer balance job, the failed or stopped jobs can't be // recovered + base = 20; { - std::unique_ptr> jobMgr = getJobManager(); - // set status to prevent running the job since AdminClient is a injector - jobMgr->status_ = JobManager::JbmgrStatus::STOPPED; - jobMgr->bgThread_.join(); - GraphSpaceID spaceId = 1; - int32_t nJob = 4; - for (auto jobId = 1; jobId <= nJob; ++jobId) { + for (auto jobId = base + 1; jobId <= base + nJob + 1; ++jobId) { cpp2::JobStatus jobStatus; if (jobId / 2) { jobStatus = cpp2::JobStatus::FAILED; @@ -675,7 +676,7 @@ TEST_F(JobManagerTest, RecoverJob) { jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - for (auto jobId = nJob + 1; jobId <= nJob + 3; ++jobId) { + for (auto jobId = base + nJob + 2; jobId <= base + nJob + 4; ++jobId) { cpp2::JobStatus jobStatus = cpp2::JobStatus::FAILED; JobDescription jd(spaceId, jobId, cpp2::JobType::FLUSH, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); @@ -689,7 +690,7 @@ TEST_F(JobManagerTest, RecoverJob) { } { cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED; - JobDescription jd(spaceId, 7, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); + JobDescription jd(spaceId, base + nJob + 5, cpp2::JobType::ZONE_BALANCE, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -701,7 +702,7 @@ TEST_F(JobManagerTest, RecoverJob) { } { cpp2::JobStatus jobStatus = cpp2::JobStatus::FINISHED; - JobDescription jd(spaceId, 8, cpp2::JobType::COMPACT, {}, jobStatus); + JobDescription jd(spaceId, base + nJob + 6, cpp2::JobType::COMPACT, {}, jobStatus); auto jobKey = MetaKeyUtils::jobKey(jd.getSpace(), jd.getJobId()); auto jobVal = MetaKeyUtils::jobVal(jd.getJobType(), jd.getParas(), @@ -711,11 +712,11 @@ TEST_F(JobManagerTest, RecoverJob) { jd.getErrorCode()); jobMgr->save(jobKey, jobVal); } - auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {1}); + auto nJobRecovered = jobMgr->recoverJob(spaceId, nullptr, {base + 1}); ASSERT_EQ(nebula::value(nJobRecovered), 0); nJobRecovered = jobMgr->recoverJob(spaceId, nullptr); - ASSERT_EQ(nebula::value(nJobRecovered), 2); + ASSERT_EQ(nebula::value(nJobRecovered), 1); } } From f5b81ce23dbcd87e0275f36082684c609b2587cc Mon Sep 17 00:00:00 2001 From: canon <87342612+caton-hpg@users.noreply.github.com> Date: Tue, 27 Sep 2022 16:01:56 +0800 Subject: [PATCH 3/4] fix http port of storaged (#4673) Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com> --- src/graph/executor/admin/ShowHostsExecutor.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/graph/executor/admin/ShowHostsExecutor.cpp b/src/graph/executor/admin/ShowHostsExecutor.cpp index 999214c858c..02689f40fb4 100644 --- a/src/graph/executor/admin/ShowHostsExecutor.cpp +++ b/src/graph/executor/admin/ShowHostsExecutor.cpp @@ -38,7 +38,7 @@ folly::Future ShowHostsExecutor::showHosts() { for (const auto &host : hostVec) { nebula::Row r({host.get_hostAddr().host, host.get_hostAddr().port, - FLAGS_ws_http_port, + 19779, // FIXME: update real http port. apache::thrift::util::enumNameSafe(host.get_status())}); int64_t leaderCount = 0; for (const auto &spaceEntry : host.get_leader_parts()) { From fff82a637f68e4390418b8e6ca1f8d894eecef34 Mon Sep 17 00:00:00 2001 From: Doodle <13706157+critical27@users.noreply.github.com> Date: Wed, 28 Sep 2022 10:20:18 +0800 Subject: [PATCH 4/4] insert vertex key when only vertex and flag is set (#4685) --- src/storage/mutate/AddVerticesProcessor.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index bd25e77ffc0..7900cd21220 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -82,7 +82,7 @@ void AddVerticesProcessor::doProcess(const cpp2::AddVerticesRequest& req) { code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (onlyVertex || FLAGS_use_vertex_key) { + if (onlyVertex && FLAGS_use_vertex_key) { data.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid), ""); } for (auto& newTag : newTags) { @@ -163,7 +163,7 @@ void AddVerticesProcessor::doProcessWithIndex(const cpp2::AddVerticesRequest& re code = nebula::cpp2::ErrorCode::E_INVALID_VID; break; } - if (onlyVertex || FLAGS_use_vertex_key) { + if (onlyVertex && FLAGS_use_vertex_key) { verticeData.emplace_back(NebulaKeyUtils::vertexKey(spaceVidLen_, partId, vid)); } for (const auto& newTag : newTags) {