-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* segment * tmp * fix format * id * remove init in yy * dep * format * cmakelists * fix header * fix review * fix * constructor * add log * remove qctx * fix * fix format * remove * add test * fix * add bench * fix * fix according to review fix * add annotation * remove remove * fix * fix * fix conflict * change ci * remove init runner and client Co-authored-by: Sophie <84560950+Sophie-Xie@users.noreply.github.com>
- Loading branch information
1 parent
e8391ee
commit 64efef4
Showing
23 changed files
with
571 additions
and
18 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
|
||
#include "common/id/SegmentId.h" | ||
|
||
namespace nebula { | ||
|
||
StatusOr<int64_t> SegmentId::getId() { | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
|
||
if (cur_ < segmentStart_ + step_ - 1) { | ||
// non-block prefetch next segment | ||
if (cur_ == segmentStart_ + (step_ / 2) - 1) { | ||
asyncFetchSegment(); | ||
} | ||
cur_ += 1; | ||
} else { // cur == segment end | ||
if (segmentStart_ >= nextSegmentStart_) { | ||
// indicate asyncFetchSegment() failed or fetchSegment() slow | ||
LOG(ERROR) | ||
<< "segmentId asyncFetchSegment() failed or slow(step is too small), segmentStart_: " | ||
<< segmentStart_ << ", nextSegmentStart_: " << nextSegmentStart_; | ||
auto xRet = fetchSegment(); | ||
NG_RETURN_IF_ERROR(xRet); | ||
nextSegmentStart_ = xRet.value(); | ||
} | ||
segmentStart_ = nextSegmentStart_; | ||
cur_ = segmentStart_; | ||
} | ||
|
||
return cur_; | ||
} | ||
|
||
void SegmentId::asyncFetchSegment() { | ||
auto future = client_->getSegmentId(step_); | ||
std::move(future).via(runner_).thenValue([this](StatusOr<int64_t> resp) { | ||
NG_RETURN_IF_ERROR(resp); | ||
if (!resp.value()) { | ||
return Status::Error("asyncFetchSegment failed!"); | ||
} | ||
this->nextSegmentStart_ = resp.value(); | ||
return Status::OK(); | ||
}); | ||
} | ||
|
||
StatusOr<int64_t> SegmentId::fetchSegment() { | ||
auto result = client_->getSegmentId(step_).get(); | ||
|
||
NG_RETURN_IF_ERROR(result); | ||
return result.value(); | ||
} | ||
|
||
Status SegmentId::init(int64_t step) { | ||
step_ = step; | ||
if (step < kMinStep_) { | ||
return Status::Error("Step is too small"); | ||
} | ||
|
||
auto xRet = fetchSegment(); | ||
NG_RETURN_IF_ERROR(xRet); | ||
|
||
segmentStart_ = xRet.value(); | ||
cur_ = segmentStart_ - 1; | ||
|
||
return Status::OK(); | ||
} | ||
|
||
} // namespace nebula |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,52 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
|
||
#ifndef COMMON_ID_SEGMENTINCR_H_ | ||
#define COMMON_ID_SEGMENTINCR_H_ | ||
|
||
#include "clients/meta/MetaClient.h" | ||
|
||
namespace nebula { | ||
// Segment auto-increase id | ||
class SegmentId { | ||
public: | ||
SegmentId(meta::BaseMetaClient* client, folly::Executor* runner) | ||
: client_(client), runner_(runner) {} | ||
|
||
~SegmentId() = default; | ||
|
||
SegmentId(const SegmentId&) = delete; | ||
|
||
SegmentId& operator=(const SegmentId&) = delete; | ||
|
||
Status init(int64_t step); | ||
|
||
StatusOr<int64_t> getId(); | ||
|
||
private: | ||
// when get id fast or fetchSegment() slow or fail, getSegmentId() directly. | ||
// In this case, the new segment will overlap with the old one. | ||
void asyncFetchSegment(); | ||
|
||
StatusOr<int64_t> fetchSegment(); | ||
|
||
std::mutex mutex_; | ||
|
||
int64_t cur_{-1}; | ||
int64_t step_{-1}; | ||
|
||
int64_t segmentStart_{-2}; | ||
int64_t nextSegmentStart_{-2}; | ||
|
||
// ensure the segment can be use for 10 mins. | ||
// 2 segment = max insert/secs * 600. segment = 400000 * 600 / 2 = 120000000 | ||
static inline constexpr int64_t kMinStep_{120000000}; | ||
|
||
meta::BaseMetaClient* client_; | ||
folly::Executor* runner_; | ||
}; | ||
} // namespace nebula | ||
|
||
#endif // COMMON_ID_SEGMENTINCR_H_ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
|
||
#include "common/id/SegmentId.h" | ||
|
||
namespace nebula { | ||
class MockMetaClient : public meta::BaseMetaClient { | ||
public: | ||
folly::Future<StatusOr<int64_t>> getSegmentId(int64_t length) override { | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
auto future = folly::makeFuture(cur_.load()); | ||
cur_.fetch_add(length); | ||
return future; | ||
} | ||
|
||
private: | ||
std::mutex mutex_; | ||
std::atomic_int64_t cur_{0}; | ||
}; | ||
} // namespace nebula |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
/* Copyright (c) 2021 vesoft inc. All rights reserved. | ||
* | ||
* This source code is licensed under Apache 2.0 License. | ||
*/ | ||
|
||
#include <folly/Benchmark.h> | ||
|
||
#include "MockMetaClient.h" | ||
|
||
class MockMetaClient : public nebula::meta::BaseMetaClient { | ||
public: | ||
folly::Future<nebula::StatusOr<int64_t>> getSegmentId(int64_t length) override { | ||
std::lock_guard<std::mutex> guard(mutex_); | ||
auto future = folly::makeFuture(cur_.load()); | ||
cur_.fetch_add(length); | ||
return future; | ||
} | ||
|
||
private: | ||
std::mutex mutex_; | ||
std::atomic_int64_t cur_{0}; | ||
}; | ||
|
||
size_t SegmentIdCurrencyTest(size_t iters, int threadNum) { | ||
constexpr size_t ops = 1000000UL; | ||
int step = 120000000; | ||
|
||
MockMetaClient metaClient = MockMetaClient(); | ||
std::shared_ptr<apache::thrift::concurrency::ThreadManager> threadManager( | ||
PriorityThreadManager::newPriorityThreadManager(32)); | ||
threadManager->setNamePrefix("executor"); | ||
threadManager->start(); | ||
|
||
nebula::SegmentId generator = nebula::SegmentId(&metaClient, threadManager.get()); | ||
nebula::Status status = generator.init(step); | ||
ASSERT(status.ok()); | ||
|
||
auto proc = [&]() { | ||
auto n = iters * ops; | ||
for (auto i = 0UL; i < n; i++) { | ||
nebula::StatusOr<int64_t> id = generator.getId(); | ||
folly::doNotOptimizeAway(id); | ||
} | ||
}; | ||
|
||
std::vector<std::thread> threads; | ||
threads.reserve(threadNum); | ||
|
||
for (int i = 0; i < threadNum; i++) { | ||
threads.emplace_back(std::thread(proc)); | ||
} | ||
|
||
for (int i = 0; i < threadNum; i++) { | ||
threads[i].join(); | ||
} | ||
|
||
return iters * ops * threadNum; | ||
} | ||
|
||
BENCHMARK_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 1_thread, 1) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 2_thread, 2) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 4_thread, 4) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 8_thread, 8) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 16_thread, 16) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 32_thread, 32) | ||
BENCHMARK_RELATIVE_NAMED_PARAM_MULTI(SegmentIdCurrencyTest, 64_thread, 64) | ||
|
||
int main(int argc, char** argv) { | ||
gflags::ParseCommandLineFlags(&argc, &argv, true); | ||
folly::runBenchmarks(); | ||
return 0; | ||
} | ||
|
||
/* Intel(R) Xeon(R) CPU E5-2673 v3 @ 2.40GHz | ||
============================================================================ | ||
nebula/src/common/id/test/SegmentIdBenchmark.cpprelative time/iter iters/s | ||
============================================================================ | ||
SegmentIdCurrencyTest(1_thread) 64.23ns 15.57M | ||
SegmentIdCurrencyTest(2_thread) 55.35% 116.03ns 8.62M | ||
SegmentIdCurrencyTest(4_thread) 49.64% 129.39ns 7.73M | ||
SegmentIdCurrencyTest(8_thread) 37.61% 170.76ns 5.86M | ||
SegmentIdCurrencyTest(16_thread) 34.91% 183.98ns 5.44M | ||
SegmentIdCurrencyTest(32_thread) 27.57% 232.98ns 4.29M | ||
SegmentIdCurrencyTest(64_thread) 21.77% 295.00ns 3.39M | ||
============================================================================ | ||
*/ |
Oops, something went wrong.