Skip to content

Commit

Permalink
GH-40069: [C++] Make scalar scratch space immutable after initializat…
Browse files Browse the repository at this point in the history
…ion (#40237)

### Rationale for this change

As #40069 shows, TSAN reports data race that is caused by concurrent filling the scratch space of a scalar instance. The concurrent use of the same scalar could be parallel executing an acero plan containing a literal (a "constant" that is simply represented by an underlying scalar), and this is totally legit. The problem lies in the fact that the scratch space of the scalar is filled "lazily" by the time when it is being involved in the computation and transformed to an array span, for *every* thread.

After piloting several approaches (relaxed atomic - an earlier version of this PR, locking - #40260), @ pitrou and @ bkietz suggested an immutable-after-initialization approach, for which the latest version of this PR is.

### What changes are included in this PR?

There are generally two parts in this PR:
1. Mandate the initialization of the scratch space in constructor of the concrete subclass of `Scalar`.
2. In order to keep the content of the scratch space consistent with the underlying `value` of the scalar, make the `value` constant. This effectively makes legacy code that directly assigning to the `value` invalid, which is refactored accordingly:
  2.1 `BoxScalar` in https://github.com/apache/arrow/pull/40237/files#diff-08d11e02c001c82b1aa89565e16760a8bcca4a608c22619fb45da42fd0ebebac
  2.2 `Scalar::CastTo` in https://github.com/apache/arrow/pull/40237/files#diff-b4b83682450006616fa7e4f6f2ea3031cf1a22d734f4bee42a99af313e808f9e
  2.3 `ScalarMinMax` in https://github.com/apache/arrow/pull/40237/files#diff-368ab7e748bd4432c92d9fdc26b51e131742b968e3eb32a6fcea4b9f02fa36aa

Besides, when refactoring 2.2, I found the current `Scalar::CastTo` is not fully covered by the existing tests. So I also added some lacking ones.

### Are these changes tested?

Yes.

### Are there any user-facing changes?

**This PR includes breaking changes to public APIs.**
The `value` member of `BaseBinaryScalar` and subclasses/`BaseListScalar` and subclasses/`SparseUnionScalar`/`DenseUnionScalar`/`RunEndEncodedScalar` is made constant, thus code directly assigning to this member will no more compile.

Also the `Scalar::mutable_data()` member function is removed because it's against the immutable nature of `Scalar`.

However the impact of these changes seems limited. I don't think much user code is depending on these two old pieces of code.

Also after an quick search, I didn't find any document that need to be updated according to this change. There could be none. But if there is, may someone please redirect me to it so I can update. Thanks.

* GitHub Issue: #40069

Lead-authored-by: Ruoxi Sun <zanmato1984@gmail.com>
Co-authored-by: Rossi Sun <zanmato1984@gmail.com>
Signed-off-by: Benjamin Kietzman <bengilgit@gmail.com>
  • Loading branch information
zanmato1984 authored Apr 23, 2024
1 parent 68eeef5 commit d2f140d
Show file tree
Hide file tree
Showing 8 changed files with 764 additions and 354 deletions.
6 changes: 4 additions & 2 deletions c_glib/arrow-glib/scalar.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1063,7 +1063,8 @@ garrow_base_binary_scalar_get_value(GArrowBaseBinaryScalar *scalar)
if (!priv->value) {
const auto arrow_scalar = std::static_pointer_cast<arrow::BaseBinaryScalar>(
garrow_scalar_get_raw(GARROW_SCALAR(scalar)));
priv->value = garrow_buffer_new_raw(&(arrow_scalar->value));
priv->value = garrow_buffer_new_raw(
const_cast<std::shared_ptr<arrow::Buffer> *>(&(arrow_scalar->value)));
}
return priv->value;
}
Expand Down Expand Up @@ -1983,7 +1984,8 @@ garrow_base_list_scalar_get_value(GArrowBaseListScalar *scalar)
if (!priv->value) {
const auto arrow_scalar = std::static_pointer_cast<arrow::BaseListScalar>(
garrow_scalar_get_raw(GARROW_SCALAR(scalar)));
priv->value = garrow_array_new_raw(&(arrow_scalar->value));
priv->value = garrow_array_new_raw(
const_cast<std::shared_ptr<arrow::Array> *>(&(arrow_scalar->value)));
}
return priv->value;
}
Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/array/array_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <cmath>
#include <cstdint>
#include <cstring>
#include <future>
#include <limits>
#include <memory>
#include <numeric>
Expand Down Expand Up @@ -823,6 +824,41 @@ TEST_F(TestArray, TestFillFromScalar) {
}
}

// GH-40069: Data-race when concurrent calling ArraySpan::FillFromScalar of the same
// scalar instance.
TEST_F(TestArray, TestConcurrentFillFromScalar) {
for (auto type : TestArrayUtilitiesAgainstTheseTypes()) {
ARROW_SCOPED_TRACE("type = ", type->ToString());
for (auto seed : {0u, 0xdeadbeef, 42u}) {
ARROW_SCOPED_TRACE("seed = ", seed);

Field field("", type, /*nullable=*/true,
key_value_metadata({{"extension_allow_random_storage", "true"}}));
auto array = random::GenerateArray(field, 1, seed);

ASSERT_OK_AND_ASSIGN(auto scalar, array->GetScalar(0));

// Lambda to create fill an ArraySpan with the scalar and use the ArraySpan a bit.
auto array_span_from_scalar = [&]() {
ArraySpan span(*scalar);
auto roundtripped_array = span.ToArray();
ASSERT_OK(roundtripped_array->ValidateFull());

AssertArraysEqual(*array, *roundtripped_array);
ASSERT_OK_AND_ASSIGN(auto roundtripped_scalar, roundtripped_array->GetScalar(0));
AssertScalarsEqual(*scalar, *roundtripped_scalar);
};

// Two concurrent calls to the lambda are just enough for TSAN to detect a race
// condition.
auto fut1 = std::async(std::launch::async, array_span_from_scalar);
auto fut2 = std::async(std::launch::async, array_span_from_scalar);
fut1.get();
fut2.get();
}
}
}

TEST_F(TestArray, ExtensionSpanRoundTrip) {
// Other types are checked in MakeEmptyArray but MakeEmptyArray doesn't
// work for extension types so we check that here
Expand Down
90 changes: 40 additions & 50 deletions cpp/src/arrow/array/data.cc
Original file line number Diff line number Diff line change
Expand Up @@ -283,25 +283,15 @@ void ArraySpan::SetMembers(const ArrayData& data) {

namespace {

template <typename offset_type>
BufferSpan OffsetsForScalar(uint8_t* scratch_space, offset_type value_size) {
auto* offsets = reinterpret_cast<offset_type*>(scratch_space);
offsets[0] = 0;
offsets[1] = static_cast<offset_type>(value_size);
static_assert(2 * sizeof(offset_type) <= 16);
return {scratch_space, sizeof(offset_type) * 2};
BufferSpan OffsetsForScalar(uint8_t* scratch_space, int64_t offset_width) {
return {scratch_space, offset_width * 2};
}

template <typename offset_type>
std::pair<BufferSpan, BufferSpan> OffsetsAndSizesForScalar(uint8_t* scratch_space,
offset_type value_size) {
int64_t offset_width) {
auto* offsets = scratch_space;
auto* sizes = scratch_space + sizeof(offset_type);
reinterpret_cast<offset_type*>(offsets)[0] = 0;
reinterpret_cast<offset_type*>(sizes)[0] = value_size;
static_assert(2 * sizeof(offset_type) <= 16);
return {BufferSpan{offsets, sizeof(offset_type)},
BufferSpan{sizes, sizeof(offset_type)}};
auto* sizes = scratch_space + offset_width;
return {BufferSpan{offsets, offset_width}, BufferSpan{sizes, offset_width}};
}

int GetNumBuffers(const DataType& type) {
Expand Down Expand Up @@ -415,26 +405,23 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
data_size = scalar.value->size();
}
if (is_binary_like(type_id)) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(data_size));
const auto& binary_scalar = checked_cast<const BinaryScalar&>(value);
this->buffers[1] = OffsetsForScalar(binary_scalar.scratch_space_, sizeof(int32_t));
} else {
// is_large_binary_like
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, data_size);
const auto& large_binary_scalar = checked_cast<const LargeBinaryScalar&>(value);
this->buffers[1] =
OffsetsForScalar(large_binary_scalar.scratch_space_, sizeof(int64_t));
}
this->buffers[2].data = const_cast<uint8_t*>(data_buffer);
this->buffers[2].size = data_size;
} else if (type_id == Type::BINARY_VIEW || type_id == Type::STRING_VIEW) {
const auto& scalar = checked_cast<const BaseBinaryScalar&>(value);
const auto& scalar = checked_cast<const BinaryViewScalar&>(value);

this->buffers[1].size = BinaryViewType::kSize;
this->buffers[1].data = scalar.scratch_space_;
static_assert(sizeof(BinaryViewType::c_type) <= sizeof(scalar.scratch_space_));
auto* view = new (&scalar.scratch_space_) BinaryViewType::c_type;
if (scalar.is_valid) {
*view = util::ToBinaryView(std::string_view{*scalar.value}, 0, 0);
this->buffers[2] = internal::PackVariadicBuffers({&scalar.value, 1});
} else {
*view = {};
}
} else if (type_id == Type::FIXED_SIZE_BINARY) {
const auto& scalar = checked_cast<const BaseBinaryScalar&>(value);
Expand All @@ -443,30 +430,36 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
} else if (is_var_length_list_like(type_id) || type_id == Type::FIXED_SIZE_LIST) {
const auto& scalar = checked_cast<const BaseListScalar&>(value);

int64_t value_length = 0;
this->child_data.resize(1);
if (scalar.value != nullptr) {
// When the scalar is null, scalar.value can also be null
this->child_data[0].SetMembers(*scalar.value->data());
value_length = scalar.value->length();
} else {
// Even when the value is null, we still must populate the
// child_data to yield a valid array. Tedious
internal::FillZeroLengthArray(this->type->field(0)->type().get(),
&this->child_data[0]);
}

if (type_id == Type::LIST || type_id == Type::MAP) {
this->buffers[1] =
OffsetsForScalar(scalar.scratch_space_, static_cast<int32_t>(value_length));
if (type_id == Type::LIST) {
const auto& list_scalar = checked_cast<const ListScalar&>(value);
this->buffers[1] = OffsetsForScalar(list_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::MAP) {
const auto& map_scalar = checked_cast<const MapScalar&>(value);
this->buffers[1] = OffsetsForScalar(map_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::LARGE_LIST) {
this->buffers[1] = OffsetsForScalar(scalar.scratch_space_, value_length);
const auto& large_list_scalar = checked_cast<const LargeListScalar&>(value);
this->buffers[1] =
OffsetsForScalar(large_list_scalar.scratch_space_, sizeof(int64_t));
} else if (type_id == Type::LIST_VIEW) {
std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar(
scalar.scratch_space_, static_cast<int32_t>(value_length));
} else if (type_id == Type::LARGE_LIST_VIEW) {
const auto& list_view_scalar = checked_cast<const ListViewScalar&>(value);
std::tie(this->buffers[1], this->buffers[2]) =
OffsetsAndSizesForScalar(scalar.scratch_space_, value_length);
OffsetsAndSizesForScalar(list_view_scalar.scratch_space_, sizeof(int32_t));
} else if (type_id == Type::LARGE_LIST_VIEW) {
const auto& large_list_view_scalar =
checked_cast<const LargeListViewScalar&>(value);
std::tie(this->buffers[1], this->buffers[2]) = OffsetsAndSizesForScalar(
large_list_view_scalar.scratch_space_, sizeof(int64_t));
} else {
DCHECK_EQ(type_id, Type::FIXED_SIZE_LIST);
// FIXED_SIZE_LIST: does not have a second buffer
Expand All @@ -480,27 +473,19 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
this->child_data[i].FillFromScalar(*scalar.value[i]);
}
} else if (is_union(type_id)) {
// Dense union needs scratch space to store both offsets and a type code
struct UnionScratchSpace {
alignas(int64_t) int8_t type_code;
alignas(int64_t) uint8_t offsets[sizeof(int32_t) * 2];
};
static_assert(sizeof(UnionScratchSpace) <= sizeof(UnionScalar::scratch_space_));
auto* union_scratch_space = reinterpret_cast<UnionScratchSpace*>(
&checked_cast<const UnionScalar&>(value).scratch_space_);

// First buffer is kept null since unions have no validity vector
this->buffers[0] = {};

union_scratch_space->type_code = checked_cast<const UnionScalar&>(value).type_code;
this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

this->child_data.resize(this->type->num_fields());
if (type_id == Type::DENSE_UNION) {
const auto& scalar = checked_cast<const DenseUnionScalar&>(value);
this->buffers[2] =
OffsetsForScalar(union_scratch_space->offsets, static_cast<int32_t>(1));
auto* union_scratch_space =
reinterpret_cast<UnionScalar::UnionScratchSpace*>(&scalar.scratch_space_);

this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

this->buffers[2] = OffsetsForScalar(union_scratch_space->offsets, sizeof(int32_t));
// We can't "see" the other arrays in the union, but we put the "active"
// union array in the right place and fill zero-length arrays for the
// others
Expand All @@ -517,6 +502,12 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
}
} else {
const auto& scalar = checked_cast<const SparseUnionScalar&>(value);
auto* union_scratch_space =
reinterpret_cast<UnionScalar::UnionScratchSpace*>(&scalar.scratch_space_);

this->buffers[1].data = reinterpret_cast<uint8_t*>(&union_scratch_space->type_code);
this->buffers[1].size = 1;

// Sparse union scalars have a full complement of child values even
// though only one of them is relevant, so we just fill them in here
for (int i = 0; i < static_cast<int>(this->child_data.size()); ++i) {
Expand All @@ -541,7 +532,6 @@ void ArraySpan::FillFromScalar(const Scalar& value) {
e.null_count = 0;
e.buffers[1].data = scalar.scratch_space_;
e.buffers[1].size = sizeof(run_end);
reinterpret_cast<decltype(run_end)*>(scalar.scratch_space_)[0] = run_end;
};

switch (scalar.run_end_type()->id()) {
Expand Down
37 changes: 0 additions & 37 deletions cpp/src/arrow/compute/kernels/codegen_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -369,43 +369,6 @@ struct UnboxScalar<Decimal256Type> {
}
};

template <typename Type, typename Enable = void>
struct BoxScalar;

template <typename Type>
struct BoxScalar<Type, enable_if_has_c_type<Type>> {
using T = typename GetOutputType<Type>::T;
static void Box(T val, Scalar* out) {
// Enables BoxScalar<Int64Type> to work on a (for example) Time64Scalar
T* mutable_data = reinterpret_cast<T*>(
checked_cast<::arrow::internal::PrimitiveScalarBase*>(out)->mutable_data());
*mutable_data = val;
}
};

template <typename Type>
struct BoxScalar<Type, enable_if_base_binary<Type>> {
using T = typename GetOutputType<Type>::T;
using ScalarType = typename TypeTraits<Type>::ScalarType;
static void Box(T val, Scalar* out) {
checked_cast<ScalarType*>(out)->value = std::make_shared<Buffer>(val);
}
};

template <>
struct BoxScalar<Decimal128Type> {
using T = Decimal128;
using ScalarType = Decimal128Scalar;
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

template <>
struct BoxScalar<Decimal256Type> {
using T = Decimal256;
using ScalarType = Decimal256Scalar;
static void Box(T val, Scalar* out) { checked_cast<ScalarType*>(out)->value = val; }
};

// A VisitArraySpanInline variant that calls its visitor function with logical
// values, such as Decimal128 rather than std::string_view.

Expand Down
17 changes: 9 additions & 8 deletions cpp/src/arrow/compute/kernels/scalar_compare.cc
Original file line number Diff line number Diff line change
Expand Up @@ -491,8 +491,9 @@ template <typename OutType, typename Op>
struct ScalarMinMax {
using OutValue = typename GetOutputType<OutType>::T;

static void ExecScalar(const ExecSpan& batch,
const ElementWiseAggregateOptions& options, Scalar* out) {
static Result<std::shared_ptr<Scalar>> ExecScalar(
const ExecSpan& batch, const ElementWiseAggregateOptions& options,
std::shared_ptr<DataType> type) {
// All arguments are scalar
OutValue value{};
bool valid = false;
Expand All @@ -502,8 +503,8 @@ struct ScalarMinMax {
const Scalar& scalar = *arg.scalar;
if (!scalar.is_valid) {
if (options.skip_nulls) continue;
out->is_valid = false;
return;
valid = false;
break;
}
if (!valid) {
value = UnboxScalar<OutType>::Unbox(scalar);
Expand All @@ -513,9 +514,10 @@ struct ScalarMinMax {
value, UnboxScalar<OutType>::Unbox(scalar));
}
}
out->is_valid = valid;
if (valid) {
BoxScalar<OutType>::Box(value, out);
return MakeScalar(std::move(type), std::move(value));
} else {
return MakeNullScalar(std::move(type));
}
}

Expand All @@ -537,8 +539,7 @@ struct ScalarMinMax {
bool initialize_output = true;
if (scalar_count > 0) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Scalar> temp_scalar,
MakeScalar(out->type()->GetSharedPtr(), 0));
ExecScalar(batch, options, temp_scalar.get());
ExecScalar(batch, options, out->type()->GetSharedPtr()));
if (temp_scalar->is_valid) {
const auto value = UnboxScalar<OutType>::Unbox(*temp_scalar);
initialize_output = false;
Expand Down
Loading

0 comments on commit d2f140d

Please sign in to comment.