Skip to content

Commit

Permalink
Merge branch 'main' into level-decoding-benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Feb 1, 2024
2 parents fc0c1a5 + 2a87693 commit 24b9677
Show file tree
Hide file tree
Showing 60 changed files with 2,194 additions and 418 deletions.
7 changes: 1 addition & 6 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
Expand Up @@ -30,15 +30,10 @@
# /cpp/
/cpp/src/arrow/acero @westonpace
/cpp/src/arrow/adapters/orc @wgtmac
/cpp/src/arrow/dataset @westonpace
/cpp/src/arrow/engine @westonpace
/cpp/src/arrow/flight/ @lidavidm
/cpp/src/arrow/util/async* @westonpace
/cpp/src/arrow/util/future* @westonpace
/cpp/src/arrow/util/thread* @westonpace
/cpp/src/parquet @wgtmac
/cpp/src/skyhook @westonpace
/csharp/ @westonpace
/csharp/ @curthagenlocher
/go/ @zeroshade
/java/ @lidavidm
/js/ @domoritz @trxcllnt
Expand Down
5 changes: 4 additions & 1 deletion c_glib/arrow-glib/basic-data-type.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1212,7 +1212,8 @@ garrow_timestamp_data_type_class_init(GArrowTimestampDataTypeClass *klass)
/**
* garrow_timestamp_data_type_new:
* @unit: The unit of the timestamp data.
* @time_zone: (nullable): The time zone of the timestamp data.
* @time_zone: (nullable): The time zone of the timestamp data. If based GLib
* is less than 2.58, this is ignored.
*
* Returns: A newly created the number of
* seconds/milliseconds/microseconds/nanoseconds since UNIX epoch in
Expand All @@ -1226,9 +1227,11 @@ garrow_timestamp_data_type_new(GArrowTimeUnit unit,
{
auto arrow_unit = garrow_time_unit_to_raw(unit);
std::string arrow_timezone;
#if GLIB_CHECK_VERSION(2, 58, 0)
if (time_zone) {
arrow_timezone = g_time_zone_get_identifier(time_zone);
}
#endif
auto arrow_data_type = arrow::timestamp(arrow_unit, arrow_timezone);
auto data_type =
GARROW_TIMESTAMP_DATA_TYPE(g_object_new(GARROW_TYPE_TIMESTAMP_DATA_TYPE,
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_python.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ cloudpickle
fsspec
hypothesis
numpy>=1.16.6
pytest
pytest<8 # pytest-lazy-fixture broken on pytest 8.0.0
pytest-faulthandler
pytest-lazy-fixture
s3fs>=2023.10.0
Expand Down
2 changes: 1 addition & 1 deletion ci/conda_env_sphinx.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ breathe
doxygen
ipython
numpydoc
pydata-sphinx-theme=0.14.1
pydata-sphinx-theme=0.14
sphinx-autobuild
sphinx-design
sphinx-copybutton
Expand Down
156 changes: 110 additions & 46 deletions cpp/src/arrow/chunk_resolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,87 +18,151 @@
#pragma once

#include <atomic>
#include <cassert>
#include <cstdint>
#include <vector>

#include "arrow/type_fwd.h"
#include "arrow/util/macros.h"

namespace arrow {
namespace internal {
namespace arrow::internal {

struct ChunkLocation {
int64_t chunk_index, index_in_chunk;
/// \brief Index of the chunk in the array of chunks
///
/// The value is always in the range `[0, chunks.size()]`. `chunks.size()` is used
/// to represent out-of-bounds locations.
int64_t chunk_index;

/// \brief Index of the value in the chunk
///
/// The value is undefined if chunk_index >= chunks.size()
int64_t index_in_chunk;
};

// An object that resolves an array chunk depending on a logical index
/// \brief An utility that incrementally resolves logical indices into
/// physical indices in a chunked array.
struct ARROW_EXPORT ChunkResolver {
explicit ChunkResolver(const ArrayVector& chunks);
private:
/// \brief Array containing `chunks.size() + 1` offsets.
///
/// `offsets_[i]` is the starting logical index of chunk `i`. `offsets_[0]` is always 0
/// and `offsets_[chunks.size()]` is the logical length of the chunked array.
std::vector<int64_t> offsets_;

explicit ChunkResolver(const std::vector<const Array*>& chunks);
/// \brief Cache of the index of the last resolved chunk.
///
/// \invariant `cached_chunk_ in [0, chunks.size()]`
mutable std::atomic<int64_t> cached_chunk_;

public:
explicit ChunkResolver(const ArrayVector& chunks);
explicit ChunkResolver(const std::vector<const Array*>& chunks);
explicit ChunkResolver(const RecordBatchVector& batches);

ChunkResolver(ChunkResolver&& other) noexcept
: offsets_(std::move(other.offsets_)), cached_chunk_(other.cached_chunk_.load()) {}
: offsets_(std::move(other.offsets_)),
cached_chunk_(other.cached_chunk_.load(std::memory_order_relaxed)) {}

ChunkResolver& operator=(ChunkResolver&& other) {
offsets_ = std::move(other.offsets_);
cached_chunk_.store(other.cached_chunk_.load());
cached_chunk_.store(other.cached_chunk_.load(std::memory_order_relaxed));
return *this;
}

/// \brief Return a ChunkLocation containing the chunk index and in-chunk value index of
/// the chunked array at logical index
inline ChunkLocation Resolve(const int64_t index) const {
// It is common for the algorithms below to make consecutive accesses at
// a relatively small distance from each other, hence often falling in
// the same chunk.
// This is trivial when merging (assuming each side of the merge uses
// its own resolver), but also in the inner recursive invocations of
/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
/// equivalent to the logical index.
///
/// \pre index >= 0
/// \post location.chunk_index in [0, chunks.size()]
/// \param index The logical index to resolve
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation Resolve(int64_t index) const {
const auto cached_chunk = cached_chunk_.load(std::memory_order_relaxed);
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/true>(index, cached_chunk);
return {chunk_index, index - offsets_[chunk_index]};
}

/// \brief Resolve a logical index to a ChunkLocation.
///
/// The returned ChunkLocation contains the chunk index and the within-chunk index
/// equivalent to the logical index.
///
/// \pre index >= 0
/// \post location.chunk_index in [0, chunks.size()]
/// \param index The logical index to resolve
/// \param cached_chunk_index 0 or the chunk_index of the last ChunkLocation
/// returned by this ChunkResolver.
/// \return ChunkLocation with a valid chunk_index if index is within
/// bounds, or with chunk_index == chunks.size() if logical index is
/// `>= chunked_array.length()`.
inline ChunkLocation ResolveWithChunkIndexHint(int64_t index,
int64_t cached_chunk_index) const {
assert(cached_chunk_index < static_cast<int64_t>(offsets_.size()));
const auto chunk_index =
ResolveChunkIndex</*StoreCachedChunk=*/false>(index, cached_chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
}

private:
template <bool StoreCachedChunk>
inline int64_t ResolveChunkIndex(int64_t index, int64_t cached_chunk) const {
// It is common for algorithms sequentially processing arrays to make consecutive
// accesses at a relatively small distance from each other, hence often falling in the
// same chunk.
//
// This is guaranteed when merging (assuming each side of the merge uses its
// own resolver), and is the most common case in recursive invocations of
// partitioning.
if (offsets_.size() <= 1) {
return {0, index};
const auto num_offsets = static_cast<int64_t>(offsets_.size());
const int64_t* offsets = offsets_.data();
if (ARROW_PREDICT_TRUE(index >= offsets[cached_chunk]) &&
(cached_chunk + 1 == num_offsets || index < offsets[cached_chunk + 1])) {
return cached_chunk;
}
const auto cached_chunk = cached_chunk_.load();
const bool cache_hit =
(index >= offsets_[cached_chunk] && index < offsets_[cached_chunk + 1]);
if (ARROW_PREDICT_TRUE(cache_hit)) {
return {cached_chunk, index - offsets_[cached_chunk]};
// lo < hi is guaranteed by `num_offsets = chunks.size() + 1`
const auto chunk_index = Bisect(index, offsets, /*lo=*/0, /*hi=*/num_offsets);
if constexpr (StoreCachedChunk) {
assert(chunk_index < static_cast<int64_t>(offsets_.size()));
cached_chunk_.store(chunk_index, std::memory_order_relaxed);
}
auto chunk_index = Bisect(index);
cached_chunk_.store(chunk_index);
return {chunk_index, index - offsets_[chunk_index]};
return chunk_index;
}

protected:
// Find the chunk index corresponding to a value index using binary search
inline int64_t Bisect(const int64_t index) const {
// Like std::upper_bound(), but hand-written as it can help the compiler.
// Search [lo, lo + n)
int64_t lo = 0;
auto n = static_cast<int64_t>(offsets_.size());
while (n > 1) {
/// \brief Find the index of the chunk that contains the logical index.
///
/// Any non-negative index is accepted. When `hi=num_offsets`, the largest
/// possible return value is `num_offsets-1` which is equal to
/// `chunks.size()`. The is returned when the logical index is out-of-bounds.
///
/// \pre index >= 0
/// \pre lo < hi
/// \pre lo >= 0 && hi <= offsets_.size()
static inline int64_t Bisect(int64_t index, const int64_t* offsets, int64_t lo,
int64_t hi) {
// Similar to std::upper_bound(), but slightly different as our offsets
// array always starts with 0.
auto n = hi - lo;
// First iteration does not need to check for n > 1
// (lo < hi is guaranteed by the precondition).
assert(n > 1 && "lo < hi is a precondition of Bisect");
do {
const int64_t m = n >> 1;
const int64_t mid = lo + m;
if (static_cast<int64_t>(index) >= offsets_[mid]) {
if (index >= offsets[mid]) {
lo = mid;
n -= m;
} else {
n = m;
}
}
} while (n > 1);
return lo;
}

private:
// Collection of starting offsets used for binary search
std::vector<int64_t> offsets_;

// Tracks the most recently used chunk index to allow fast
// access for consecutive indices corresponding to the same chunk
mutable std::atomic<int64_t> cached_chunk_;
};

} // namespace internal
} // namespace arrow
} // namespace arrow::internal
Loading

0 comments on commit 24b9677

Please sign in to comment.