Skip to content

Commit 54cc3eb

Browse files
author
Rafał Hibner
committed
Merge remote-tracking branch 'mroz45/types' into combined3
2 parents aa3e25f + e8f7ccf commit 54cc3eb

File tree

3 files changed

+137
-52
lines changed

3 files changed

+137
-52
lines changed

cpp/src/arrow/acero/asof_join_node.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1225,6 +1225,10 @@ class AsofJoinNode : public ExecNode {
12251225
case Type::LARGE_STRING:
12261226
case Type::BINARY:
12271227
case Type::LARGE_BINARY:
1228+
case Type::LIST:
1229+
case Type::FIXED_SIZE_LIST:
1230+
case Type::STRUCT:
1231+
case Type::MAP:
12281232
return Status::OK();
12291233
default:
12301234
return Status::Invalid("Unsupported type for data field ", field->name(), " : ",

cpp/src/arrow/acero/asof_join_node_test.cc

Lines changed: 126 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1295,14 +1295,6 @@ TRACED_TEST(AsofJoinTest, TestUnsupportedByType, {
12951295
field("r0_v0", float32())}));
12961296
})
12971297

1298-
TRACED_TEST(AsofJoinTest, TestUnsupportedDatatype, {
1299-
// List is unsupported
1300-
DoRunInvalidTypeTest(
1301-
schema({field("time", int64()), field("key", int32()), field("l_v0", float64())}),
1302-
schema({field("time", int64()), field("key", int32()),
1303-
field("r0_v0", list(int32()))}));
1304-
})
1305-
13061298
TRACED_TEST(AsofJoinTest, TestMissingKeys, {
13071299
DoRunMissingKeysTest(
13081300
schema({field("time1", int64()), field("key", int32()), field("l_v0", float64())}),
@@ -1910,5 +1902,131 @@ TEST(AsofJoinTest, OneSideTsAllGreaterThanTheOther) {
19101902
}
19111903
}
19121904

1905+
// GH-44729: Testing nested data type for non-key fields
1906+
TEST(AsofJoinTest, FixedListDataType) {
1907+
const int32_t list_size = 3;
1908+
auto list_type = arrow::fixed_size_list(arrow::int32(), list_size);
1909+
1910+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1911+
auto right_batch = ExecBatchFromJSON({list_type, int64()}, R"([
1912+
[[0, 1, 2], 2],
1913+
[[3, 4, 5], 3],
1914+
[[6, 7, 8], 4]
1915+
])");
1916+
1917+
Declaration left{"exec_batch_source",
1918+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1919+
{std::move(left_batch)})};
1920+
Declaration right{"exec_batch_source",
1921+
ExecBatchSourceNodeOptions(
1922+
schema({field("colVals", list_type), field("on", int64())}),
1923+
{std::move(right_batch)})};
1924+
1925+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1926+
Declaration asof_join{
1927+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1928+
1929+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1930+
1931+
auto exp_batch = ExecBatchFromJSON({int64(), list_type}, R"([
1932+
[1, [0, 1, 2]],
1933+
[2, [0, 1, 2]],
1934+
[3, [3, 4, 5]]
1935+
])");
1936+
1937+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
1938+
}
1939+
1940+
TEST(AsofJoinTest, ListDataType) {
1941+
auto list_type = list(int32());
1942+
1943+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1944+
auto right_batch = ExecBatchFromJSON({list_type, int64()}, R"([
1945+
[[0, 1, 2, 9], 2],
1946+
[[3, 4, 5, 7], 3],
1947+
[[6, 7, 8], 4]
1948+
])");
1949+
1950+
Declaration left{"exec_batch_source",
1951+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1952+
{std::move(left_batch)})};
1953+
Declaration right{"exec_batch_source",
1954+
ExecBatchSourceNodeOptions(
1955+
schema({field("colVals", list_type), field("on", int64())}),
1956+
{std::move(right_batch)})};
1957+
1958+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1959+
Declaration asof_join{
1960+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1961+
1962+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1963+
auto exp_batch = ExecBatchFromJSON({int64(), list_type}, R"([
1964+
[1, [0, 1, 2, 9]],
1965+
[2, [0, 1, 2, 9]],
1966+
[3, [3, 4, 5, 7]]
1967+
])");
1968+
1969+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
1970+
}
1971+
1972+
TEST(AsofJoinTest, StructTestDataType) {
1973+
auto struct_type = struct_({field("key", utf8()), field("value", int64())});
1974+
1975+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
1976+
auto right_batch = ExecBatchFromJSON({struct_type, int64()}, R"([
1977+
[{"key": "a", "value": 1}, 2],
1978+
[{"key": "b", "value": 3}, 3],
1979+
[{"key": "c", "value": 5}, 4]
1980+
])");
1981+
1982+
Declaration left{"exec_batch_source",
1983+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
1984+
{std::move(left_batch)})};
1985+
Declaration right{"exec_batch_source",
1986+
ExecBatchSourceNodeOptions(
1987+
schema({field("col", struct_type), field("on", int64())}),
1988+
{std::move(right_batch)})};
1989+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
1990+
Declaration asof_join{
1991+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
1992+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
1993+
1994+
auto exp_batch = ExecBatchFromJSON({int64(), struct_type}, R"([
1995+
[1, {"key": "a", "value": 1}],
1996+
[2, {"key": "a", "value": 1}],
1997+
[3, {"key": "b", "value": 3}]
1998+
])");
1999+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
2000+
}
2001+
2002+
TEST(AsofJoinTest, MapTestDataType) {
2003+
auto map_type = map(int64(), int64());
2004+
2005+
auto left_batch = ExecBatchFromJSON({int64()}, R"([[1], [2], [3]])");
2006+
auto right_batch = ExecBatchFromJSON({map_type, int64()}, R"([
2007+
[[[11, 111], [22, 222]], 2],
2008+
[[[33, 333], [44, 444], [77, 777]], 3],
2009+
[[[55, 555], [66, 666]], 4]
2010+
])");
2011+
2012+
Declaration left{"exec_batch_source",
2013+
ExecBatchSourceNodeOptions(schema({field("on", int64())}),
2014+
{std::move(left_batch)})};
2015+
Declaration right{
2016+
"exec_batch_source",
2017+
ExecBatchSourceNodeOptions(schema({field("col", map_type), field("on", int64())}),
2018+
{std::move(right_batch)})};
2019+
AsofJoinNodeOptions asof_join_opts({{{"on"}, {}}, {{"on"}, {}}}, 1);
2020+
Declaration asof_join{
2021+
"asofjoin", {std::move(left), std::move(right)}, std::move(asof_join_opts)};
2022+
2023+
ASSERT_OK_AND_ASSIGN(auto result, DeclarationToExecBatches(std::move(asof_join)));
2024+
auto exp_batch = ExecBatchFromJSON({int64(), map_type}, R"([
2025+
[1, [[11, 111], [22, 222]]],
2026+
[2, [[11, 111], [22, 222]]],
2027+
[3, [[33, 333], [44, 444], [77, 777]]]
2028+
])");
2029+
AssertExecBatchesEqual(result.schema, {exp_batch}, result.batches);
2030+
}
19132031
} // namespace acero
19142032
} // namespace arrow

cpp/src/arrow/acero/unmaterialized_table_internal.h

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include <vector>
2222
#include "arrow/array/builder_base.h"
2323
#include "arrow/array/builder_binary.h"
24+
#include "arrow/array/builder_nested.h"
2425
#include "arrow/array/builder_primitive.h"
2526
#include "arrow/memory_pool.h"
2627
#include "arrow/record_batch.h"
@@ -112,6 +113,10 @@ class UnmaterializedCompositeTable {
112113
MATERIALIZE_CASE(LARGE_STRING)
113114
MATERIALIZE_CASE(BINARY)
114115
MATERIALIZE_CASE(LARGE_BINARY)
116+
MATERIALIZE_CASE(FIXED_SIZE_LIST)
117+
MATERIALIZE_CASE(LIST)
118+
MATERIALIZE_CASE(STRUCT)
119+
MATERIALIZE_CASE(MAP)
115120
default:
116121
return arrow::Status::Invalid("Unsupported data type ",
117122
field->type()->ToString(), " for field ",
@@ -165,45 +170,6 @@ class UnmaterializedCompositeTable {
165170
num_rows += slice.Size();
166171
}
167172

168-
template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
169-
enable_if_boolean<Type, Status> static BuilderAppend(
170-
Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
171-
if (source->IsNull(row)) {
172-
builder.UnsafeAppendNull();
173-
return Status::OK();
174-
}
175-
builder.UnsafeAppend(bit_util::GetBit(source->template GetValues<uint8_t>(1), row));
176-
return Status::OK();
177-
}
178-
179-
template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
180-
enable_if_t<is_fixed_width_type<Type>::value && !is_boolean_type<Type>::value,
181-
Status> static BuilderAppend(Builder& builder,
182-
const std::shared_ptr<ArrayData>& source,
183-
uint64_t row) {
184-
if (source->IsNull(row)) {
185-
builder.UnsafeAppendNull();
186-
return Status::OK();
187-
}
188-
using CType = typename TypeTraits<Type>::CType;
189-
builder.UnsafeAppend(source->template GetValues<CType>(1)[row]);
190-
return Status::OK();
191-
}
192-
193-
template <class Type, class Builder = typename TypeTraits<Type>::BuilderType>
194-
enable_if_base_binary<Type, Status> static BuilderAppend(
195-
Builder& builder, const std::shared_ptr<ArrayData>& source, uint64_t row) {
196-
if (source->IsNull(row)) {
197-
return builder.AppendNull();
198-
}
199-
using offset_type = typename Type::offset_type;
200-
const uint8_t* data = source->buffers[2]->data();
201-
const offset_type* offsets = source->GetValues<offset_type>(1);
202-
const offset_type offset0 = offsets[row];
203-
const offset_type offset1 = offsets[row + 1];
204-
return builder.Append(data + offset0, offset1 - offset0);
205-
}
206-
207173
template <class Type, class Builder = typename arrow::TypeTraits<Type>::BuilderType>
208174
arrow::Result<std::shared_ptr<arrow::Array>> materializeColumn(
209175
const std::shared_ptr<arrow::DataType>& type, int i_col) {
@@ -216,11 +182,8 @@ class UnmaterializedCompositeTable {
216182
for (const auto& unmaterialized_slice : slices) {
217183
const auto& [batch, start, end] = unmaterialized_slice.components[table_index];
218184
if (batch) {
219-
for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
220-
arrow::Status st = BuilderAppend<Type, Builder>(
221-
builder, batch->column_data(column_index), rowNum);
222-
ARROW_RETURN_NOT_OK(st);
223-
}
185+
ARROW_RETURN_NOT_OK(builder.AppendArraySlice(*batch->column_data(column_index),
186+
start, end - start));
224187
} else {
225188
for (uint64_t rowNum = start; rowNum < end; ++rowNum) {
226189
ARROW_RETURN_NOT_OK(builder.AppendNull());

0 commit comments

Comments
 (0)