Skip to content

Commit

Permalink
tiers reading optimization policy (#4045)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 23, 2024
1 parent 5fe2fa9 commit 2695540
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 18 deletions.
20 changes: 3 additions & 17 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,10 @@ void IBlobsReadingAction::StartReading(std::vector<TBlobRange>&& ranges) {
for (auto&& i : ranges) {
Counters->OnRequest(i.Size);
}
std::sort(ranges.begin(), ranges.end());
THashSet<TBlobRange> result;
std::optional<TBlobRange> currentRange;
std::vector<TBlobRange> currentList;
for (auto&& br : ranges) {
if (!currentRange) {
currentRange = br;
} else if (!currentRange->TryGlueWithNext(br)) {
result.emplace(*currentRange);
Groups.emplace(*currentRange, std::move(currentList));
currentRange = br;
currentList.clear();
}
currentList.emplace_back(br);
}
if (currentRange) {
result.emplace(*currentRange);
Groups.emplace(*currentRange, std::move(currentList));
Groups = GroupBlobsForOptimization(std::move(ranges));
for (auto&& [range, _] :Groups) {
result.emplace(range);
}
return DoStartReading(std::move(result));
}
Expand Down
49 changes: 49 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/read.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,54 @@ class TActionReadBlobs {
}
};

class TBlobsGlueing {
public:
class TSequentialGluePolicy {
public:
bool Glue(TBlobRange& currentRange, const TBlobRange& addRange) const {
return currentRange.TryGlueWithNext(addRange);
}
};

class TBlobGluePolicy {
private:
const ui64 BlobLimitSize = 8LLU << 20;
public:
TBlobGluePolicy(const ui64 blobLimitSize)
: BlobLimitSize(blobLimitSize)
{
}

bool Glue(TBlobRange& currentRange, const TBlobRange& addRange) const {
return currentRange.TryGlueSameBlob(addRange, BlobLimitSize);
}
};

template <class TGluePolicy>
static THashMap<TBlobRange, std::vector<TBlobRange>> GroupRanges(std::vector<TBlobRange>&& ranges, const TGluePolicy& policy) {
std::sort(ranges.begin(), ranges.end());
THashMap<TBlobRange, std::vector<TBlobRange>> result;
std::optional<TBlobRange> currentRange;
std::vector<TBlobRange> currentList;
for (auto&& br : ranges) {
if (!currentRange) {
currentRange = br;
}
else if (!policy.Glue(*currentRange, br)) {
result.emplace(*currentRange, std::move(currentList));
currentRange = br;
currentList.clear();
}
currentList.emplace_back(br);
}
if (currentRange) {
result.emplace(*currentRange, std::move(currentList));
}
return result;
}

};

class IBlobsReadingAction: public ICommonBlobsAction {
public:
using TErrorStatus = TConclusionSpecialStatus<NKikimrProto::EReplyStatus, NKikimrProto::EReplyStatus::OK, NKikimrProto::EReplyStatus::ERROR>;
Expand All @@ -91,6 +139,7 @@ class IBlobsReadingAction: public ICommonBlobsAction {
protected:
virtual void DoStartReading(THashSet<TBlobRange>&& range) = 0;
void StartReading(std::vector<TBlobRange>&& ranges);
virtual THashMap<TBlobRange, std::vector<TBlobRange>> GroupBlobsForOptimization(std::vector<TBlobRange>&& ranges) const = 0;
public:

const THashMap<TBlobRange, std::vector<TBlobRange>>& GetGroups() const {
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/bs/read.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ class TReadingAction: public IBlobsReadingAction {
const TActorId BlobCacheActorId;
protected:
virtual void DoStartReading(THashSet<TBlobRange>&& ranges) override;
virtual THashMap<TBlobRange, std::vector<TBlobRange>> GroupBlobsForOptimization(std::vector<TBlobRange>&& ranges) const override {
return TBlobsGlueing::GroupRanges(std::move(ranges), TBlobsGlueing::TSequentialGluePolicy());
}
public:

TReadingAction(const TString& storageId, const TActorId& blobCacheActorId)
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/tier/read.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ class TReadingAction: public IBlobsReadingAction {
const NWrappers::NExternalStorage::IExternalStorageOperator::TPtr ExternalStorageOperator;
protected:
virtual void DoStartReading(THashSet<TBlobRange>&& ranges) override;
virtual THashMap<TBlobRange, std::vector<TBlobRange>> GroupBlobsForOptimization(std::vector<TBlobRange>&& ranges) const override {
return TBlobsGlueing::GroupRanges(std::move(ranges), TBlobsGlueing::TBlobGluePolicy(8LLU << 20));
}
public:

TReadingAction(const TString& storageId, const NWrappers::NExternalStorage::IExternalStorageOperator::TPtr& storageOperator)
Expand Down
17 changes: 16 additions & 1 deletion ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ struct TBlobRange {

bool operator<(const TBlobRange& br) const {
if (BlobId != br.BlobId) {
return BlobId.Hash() < br.BlobId.Hash();
return BlobId.GetLogoBlobId().Compare(br.BlobId.GetLogoBlobId()) < 0;
} else if (Offset != br.Offset) {
return Offset < br.Offset;
} else {
Expand All @@ -209,6 +209,21 @@ struct TBlobRange {
return BlobId == br.BlobId && br.Offset + br.Size == Offset;
}

bool TryGlueSameBlob(const TBlobRange& br, const ui64 limit) {
if (GetBlobId() != br.GetBlobId()) {
return false;
}
const ui32 right = std::max<ui32>(Offset + Size, br.Offset + br.Size);
const ui32 offset = std::min<ui32>(Offset, br.Offset);
const ui32 size = right - offset;
if (size > limit) {
return false;
}
Size = size;
Offset = offset;
return true;
}

bool TryGlueWithNext(const TBlobRange& br) {
if (!br.IsNextRangeFor(*this)) {
return false;
Expand Down

0 comments on commit 2695540

Please sign in to comment.