Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DX-64328 Array types for Gandiva #58

Merged
merged 46 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
dececfd
Fix merge
lriggs Oct 25, 2023
e56dba6
Test geo function return types
lriggs Jul 26, 2023
05fb64f
Fix unix build
lriggs Jul 26, 2023
0593acd
Struct testing
lriggs Aug 3, 2023
690f5ea
Merge in list/array stuff
Jan 1, 2021
41c5083
add two list type input gandiva function
Jan 2, 2021
aa0a15c
Fix leftover merge
lriggs Aug 3, 2023
0fdb09b
Fix merge
lriggs Aug 3, 2023
1976d7c
Somewhat working for int32
lriggs Aug 10, 2023
b2a8380
Almost working
lriggs Sep 1, 2023
0db2c7a
lists kind of working
lriggs Sep 8, 2023
89d9f2d
add stuff. 100 rows
lriggs Sep 8, 2023
4249251
Removed logging and testing bigger size
lriggs Sep 12, 2023
4832d63
Working version with 100 rows and correct data
lriggs Sep 13, 2023
bcfcd88
update scripts
lriggs Sep 13, 2023
a82ef6a
Using some dynamic sizes
lriggs Sep 14, 2023
5b38893
Working with 1million rows
lriggs Sep 19, 2023
2f33cda
Everything working.
lriggs Sep 27, 2023
e29a751
Remove some logging.
lriggs Sep 28, 2023
eb0c497
experiement 1
lriggs Oct 10, 2023
7f62ffb
Return validity basically working.
lriggs Oct 11, 2023
7a06e9e
Working input validiy, but large data is broken again.
lriggs Oct 16, 2023
a9a7f51
Everything working
lriggs Oct 20, 2023
a4ee4ae
Cleanup, tested
lriggs Oct 25, 2023
316b822
Cleanup and test
lriggs Oct 25, 2023
7d74608
Cleanup, tested
lriggs Oct 25, 2023
980972f
Working functions end to end.
lriggs Oct 31, 2023
ff0f9ab
More cleanup.
lriggs Nov 1, 2023
b727684
Restore pom.xml
lriggs Nov 1, 2023
cfb56a0
Cleanup
lriggs Nov 1, 2023
235e648
Update jni to assign validity vector after resize
lriggs Nov 1, 2023
112d540
cleanup
lriggs Nov 3, 2023
0e3bd9e
Disable tests for now.
lriggs Nov 3, 2023
662e9f1
Remove uneeded changes.
lriggs Nov 3, 2023
4bd53e2
Remove uneeded includes
lriggs Nov 3, 2023
b4af98a
Fix unit tests
lriggs Nov 6, 2023
a057f58
Cleanup.
lriggs Nov 6, 2023
be455ab
Cleanup
lriggs Nov 7, 2023
7c7a939
Tidy up function results with nulls present.
lriggs Nov 8, 2023
fb55515
Fix unit test
lriggs Nov 9, 2023
6639639
Make listreturn type optional. update unit test.
lriggs Nov 9, 2023
054ef11
Fix javadoc
lriggs Nov 10, 2023
2d9cdbf
GH-38511: [JAVA] added impl of getTransferPair(Field, BufferAllocator…
xxlaykxx Oct 30, 2023
f477cc8
GH-38511: [JAVA] added impl of getTransferPair(Field, BufferAllocator…
xxlaykxx Oct 31, 2023
b7c5288
Remove unneeded code.
lriggs Nov 14, 2023
b6593b3
Return list parameter type information through the function registry.
lriggs Nov 17, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,27 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
return Reserve(sizeof(T) * new_nb_elements);
}

public:
uint8_t* offsetBuffer;
int64_t offsetCapacity;
uint8_t* validityBuffer;
uint8_t* outerValidityBuffer;

protected:
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {}
ResizableBuffer(uint8_t* data, int64_t size) : MutableBuffer(data, size) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;

}
ResizableBuffer(uint8_t* data, int64_t size, std::shared_ptr<MemoryManager> mm)
: MutableBuffer(data, size, std::move(mm)) {}
: MutableBuffer(data, size, std::move(mm)) {
offsetBuffer = nullptr;
offsetCapacity = 0;
validityBuffer = nullptr;
outerValidityBuffer = nullptr;
}
};

/// \defgroup buffer-allocation-functions Functions for allocating buffers
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/gandiva/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ set(SRC_FILES
expression_registry.cc
exported_funcs_registry.cc
filter.cc
array_ops.cc
function_ir_builder.cc
function_registry.cc
function_registry_arithmetic.cc
function_registry_array.cc
function_registry_datetime.cc
function_registry_hash.cc
function_registry_math_ops.cc
Expand Down Expand Up @@ -249,7 +251,8 @@ add_gandiva_test(internals-test
random_generator_holder_test.cc
hash_utils_test.cc
gdv_function_stubs_test.cc
interval_holder_test.cc)
interval_holder_test.cc
array_ops_test.cc)

add_subdirectory(precompiled)
add_subdirectory(tests)
81 changes: 73 additions & 8 deletions cpp/src/gandiva/annotator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,25 @@ FieldDescriptorPtr Annotator::MakeDesc(FieldPtr field, bool is_output) {
int data_idx = buffer_count_++;
int validity_idx = buffer_count_++;
int offsets_idx = FieldDescriptor::kInvalidIdx;
int child_offsets_idx = FieldDescriptor::kInvalidIdx;
if (arrow::is_binary_like(field->type()->id())) {
offsets_idx = buffer_count_++;
}

if (field->type()->id() == arrow::Type::LIST) {
offsets_idx = buffer_count_++;
if (arrow::is_binary_like(field->type()->field(0)->type()->id())) {
child_offsets_idx = buffer_count_++;
}
}
int data_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
if (is_output) {
data_buffer_ptr_idx = buffer_count_++;
}
int child_valid_buffer_ptr_idx = FieldDescriptor::kInvalidIdx;
child_valid_buffer_ptr_idx = buffer_count_++;
return std::make_shared<FieldDescriptor>(field, data_idx, validity_idx, offsets_idx,
data_buffer_ptr_idx);
data_buffer_ptr_idx, child_offsets_idx, child_valid_buffer_ptr_idx);
}

int Annotator::AddHolderPointer(void* holder) {
Expand All @@ -80,17 +90,73 @@ void Annotator::PrepareBuffersForField(const FieldDescriptor& desc,
if (desc.HasOffsetsIdx()) {
uint8_t* offsets_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.offsets_idx(), offsets_buf, array_data.offset);
++buffer_idx;

if (desc.HasChildOffsetsIdx()) {
if (is_output) {
// if list field is output field, we should put buffer pointer into eval batch
// for resizing
uint8_t* child_offsets_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0].get());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);

} else {
// if list field is input field, just put buffer data into eval batch
uint8_t* child_offsets_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_offsets_buf,
array_data.child_data.at(0)->offset);

uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
lriggs marked this conversation as resolved.
Show resolved Hide resolved
eval_batch->SetBuffer(desc.child_data_offsets_idx(), child_valid_buf,
array_data.child_data.at(0)->offset);
}
}
if (array_data.type->id() != arrow::Type::LIST ||
arrow::is_binary_like(array_data.type->field(0)->type()->id())) {
// primitive type list data buffer index is 1
// binary like type list data buffer index is 2
++buffer_idx;
}
}

if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
} else {
uint8_t* data_buf =
const_cast<uint8_t*>(array_data.child_data.at(0)->buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.child_data.at(0)->offset);
if (array_data.child_data.at(0)->buffers[0] ) {
uint8_t* child_valid_buf = const_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[0]->data());
eval_batch->SetBuffer(desc.child_data_validity_idx(), child_valid_buf, 0);
}

}

uint8_t* data_buf = const_cast<uint8_t*>(array_data.buffers[buffer_idx]->data());
eval_batch->SetBuffer(desc.data_idx(), data_buf, array_data.offset);
if (is_output) {
// pass in the Buffer object for output data buffers. Can be used for resizing.
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);

if (array_data.type->id() != arrow::Type::LIST) {
uint8_t* data_buf_ptr =
reinterpret_cast<uint8_t*>(array_data.buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr, array_data.offset);
} else {
// list data buffer is in child data buffer
uint8_t* data_buf_ptr = reinterpret_cast<uint8_t*>(
array_data.child_data.at(0)->buffers[buffer_idx].get());
eval_batch->SetBuffer(desc.data_buffer_ptr_idx(), data_buf_ptr,
array_data.child_data.at(0)->offset);
}
}

}

EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
Expand All @@ -106,7 +172,6 @@ EvalBatchPtr Annotator::PrepareEvalBatch(const arrow::RecordBatch& record_batch,
// skip columns not involved in the expression.
continue;
}

PrepareBuffersForField(*(found->second), *(record_batch.column_data(i)),
eval_batch.get(), false /*is_output*/);
}
Expand Down
Loading