Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 86 additions & 28 deletions ydb/core/formats/arrow/accessor/abstract/accessor.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#include "accessor.h"

#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/formats/arrow/save_load/saver.h>
#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/formats/arrow/splitter/simple.h>
#include <ydb/core/formats/arrow/switch/compare.h>
#include <ydb/core/formats/arrow/switch/switch_type.h>

#include <ydb/library/actors/core/log.h>
#include <ydb/core/formats/arrow/permutations.h>
#include <ydb/core/formats/arrow/arrow_helpers.h>
#include <ydb/core/formats/arrow/splitter/simple.h>
#include <ydb/core/formats/arrow/save_load/saver.h>

namespace NKikimr::NArrow::NAccessor {

Expand All @@ -17,18 +19,18 @@ void IChunkedArray::TReader::AppendPositionTo(arrow::ArrayBuilder& builder, cons

std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 recordIndex) const {
auto address = GetReadChunk(recordIndex);
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
return NArrow::CopyRecords(address.GetArray(), { address.GetPosition() });
}

std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
ui32 currentOffset = offset;
ui32 countLeast = count;
std::vector<std::shared_ptr<arrow::Array>> chunks;
auto address = GetChunk({}, offset);
auto address = GetChunkSlow(offset);
while (countLeast) {
address = GetChunk(address, currentOffset);
const ui64 internalPos = currentOffset - address.GetStartPosition();
address = GetChunk(address.GetAddress(), currentOffset);
const ui64 internalPos = address.GetAddress().GetLocalIndex(currentOffset);
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
break;
Expand All @@ -43,12 +45,73 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
}

NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
AFL_VERIFY(position < GetRecordsCount());
std::optional<TCommonChunkAddress> address;

if (IsDataOwner()) {
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1)("size", chunkCurrent->GetSize());
}
auto localAddress = GetLocalData(address, position);
TAddressChain addressChain;
addressChain.Add(localAddress.GetAddress());
AFL_VERIFY(addressChain.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(addressChain));
} else {
auto chunkedArrayAddress = GetArray(chunkCurrent, position, nullptr);
if (chunkCurrent) {
AFL_VERIFY(chunkCurrent->GetSize() == 1 + chunkedArrayAddress.GetAddress().GetSize())("current", chunkCurrent->GetSize())(
"chunked", chunkedArrayAddress.GetAddress().GetSize());
}
auto localAddress = chunkedArrayAddress.GetArray()->GetLocalData(address, chunkedArrayAddress.GetAddress().GetLocalIndex(position));
auto fullAddress = std::move(chunkedArrayAddress.MutableAddress());
fullAddress.Add(localAddress.GetAddress());
AFL_VERIFY(fullAddress.Contains(position));
return TFullDataAddress(localAddress.GetArray(), std::move(fullAddress));
}
}

IChunkedArray::TFullChunkedArrayAddress IChunkedArray::GetArray(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position, const std::shared_ptr<IChunkedArray>& selfPtr) const {
AFL_VERIFY(position < GetRecordsCount());
if (IsDataOwner()) {
AFL_VERIFY(selfPtr);
TAddressChain chain;
chain.Add(TCommonChunkAddress(0, GetRecordsCount(), 0));
return IChunkedArray::TFullChunkedArrayAddress(selfPtr, std::move(chain));
}
TAddressChain addressChain;

auto* currentLevel = this;
ui32 currentPosition = position;
ui32 idx = 0;
std::vector<std::shared_ptr<IChunkedArray>> chainForTemporarySave;
while (!currentLevel->IsDataOwner()) {
std::optional<TCommonChunkAddress> currentAddress;
if (chunkCurrent) {
currentAddress = chunkCurrent->GetAddress(idx);
}
auto nextChunkedArray = currentLevel->GetLocalChunkedArray(currentAddress, currentPosition);
chainForTemporarySave.emplace_back(nextChunkedArray.GetArray());
currentLevel = chainForTemporarySave.back().get();
addressChain.Add(nextChunkedArray.GetAddress());
AFL_VERIFY(nextChunkedArray.GetAddress().GetStartPosition() <= currentPosition);
currentPosition -= nextChunkedArray.GetAddress().GetStartPosition();
++idx;
}
AFL_VERIFY(!chunkCurrent || chunkCurrent->GetSize() - idx <= 1)("idx", idx)("size", chunkCurrent->GetSize());
return TFullChunkedArrayAddress(chainForTemporarySave.back(), std::move(addressChain));
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
auto address = GetReadChunk(position);
return NArrow::DebugString(address.GetArray(), address.GetPosition());
}

std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
std::partial_ordering IChunkedArray::TReader::CompareColumns(
const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition) {
AFL_VERIFY(l.size() == r.size());
for (ui32 i = 0; i < l.size(); ++i) {
const TAddress lAddress = l[i].GetReadChunk(lPosition);
Expand All @@ -63,43 +126,38 @@ std::partial_ordering IChunkedArray::TReader::CompareColumns(const std::vector<T

IChunkedArray::TAddress IChunkedArray::TReader::GetReadChunk(const ui64 position) const {
AFL_VERIFY(position < ChunkedArray->GetRecordsCount());
if (CurrentChunkAddress && position < CurrentChunkAddress->GetStartPosition() + CurrentChunkAddress->GetArray()->length() && CurrentChunkAddress->GetStartPosition() <= position) {
if (CurrentChunkAddress && CurrentChunkAddress->GetAddress().Contains(position)) {
} else {
CurrentChunkAddress = ChunkedArray->DoGetChunk(CurrentChunkAddress, position);
CurrentChunkAddress = ChunkedArray->GetChunk(CurrentChunkAddress, position);
}
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), position - CurrentChunkAddress->GetStartPosition(), CurrentChunkAddress->GetChunkIndex());
return IChunkedArray::TAddress(CurrentChunkAddress->GetArray(), CurrentChunkAddress->GetAddress().GetLocalIndex(position));
}

const std::partial_ordering IChunkedArray::TAddress::Compare(const TAddress& item) const {
return TComparator::TypedCompare<true>(*Array, Position, *item.Array, item.Position);
}

TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
TChunkedArraySerialized::TChunkedArraySerialized(const std::shared_ptr<IChunkedArray>& array, const TString& serializedData)
: Array(array)
, SerializedData(serializedData) {
AFL_VERIFY(serializedData);
AFL_VERIFY(Array);
AFL_VERIFY(Array->GetRecordsCount());
}

std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(GetStartPosition() <= position)("pos", position)("start", GetStartPosition());
AFL_VERIFY(position < GetFinishPosition())("pos", position)("finish", GetFinishPosition());
AFL_VERIFY(item.GetStartPosition() <= itemPosition)("start", item.GetStartPosition())("item", itemPosition);
AFL_VERIFY(itemPosition < item.GetFinishPosition())("item", itemPosition)("finish", item.GetFinishPosition());
return TComparator::TypedCompare<true>(*Array, position - GetStartPosition(), *item.Array, itemPosition - item.GetStartPosition());
std::partial_ordering IChunkedArray::TFullDataAddress::Compare(
const ui64 position, const TFullDataAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(Address.Contains(position))("pos", position)("start", Address.DebugString());
AFL_VERIFY(item.Address.Contains(itemPosition))("pos", itemPosition)("start", item.Address.DebugString());
return TComparator::TypedCompare<true>(*Array, Address.GetLocalIndex(position), *item.Array, item.Address.GetLocalIndex(itemPosition));
}

std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
AFL_VERIFY(GetStartPosition() <= recordIndex);
AFL_VERIFY(recordIndex < GetFinishPosition());
return NArrow::CopyRecords(Array, { recordIndex - GetStartPosition() });
std::shared_ptr<arrow::Array> IChunkedArray::TFullDataAddress::CopyRecord(const ui64 recordIndex) const {
return NArrow::CopyRecords(Array, { Address.GetLocalIndex(recordIndex) });
}

TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
AFL_VERIFY(position < GetFinishPosition());
AFL_VERIFY(GetStartPosition() <= position);
return NArrow::DebugString(Array, position - GetStartPosition());
TString IChunkedArray::TFullDataAddress::DebugString(const ui64 position) const {
return NArrow::DebugString(Array, Address.GetLocalIndex(position));
}

}
} // namespace NKikimr::NArrow::NAccessor
Loading