Skip to content

Commit

Permalink
Merge pull request apache#12 from Intel-bigdata/wip_chendi
Browse files Browse the repository at this point in the history
[C++] Add a new function to sort items in arrays
  • Loading branch information
xuechendi authored Dec 10, 2019
2 parents b9ce3ee + 00dbf57 commit ec5747c
Show file tree
Hide file tree
Showing 5 changed files with 422 additions and 0 deletions.
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,7 @@ if(ARROW_COMPUTE)
compute/kernels/mean.cc
compute/kernels/minmax.cc
compute/kernels/sort_to_indices.cc
compute/kernels/sort_arrays_to_indices.cc
compute/kernels/sum.cc
compute/kernels/add.cc
compute/kernels/take.cc
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ add_arrow_test(boolean_test PREFIX "arrow-compute")
add_arrow_test(cast_test PREFIX "arrow-compute")
add_arrow_test(hash_test PREFIX "arrow-compute")
add_arrow_test(isin_test PREFIX "arrow-compute")
add_arrow_test(sort_arrays_to_indices_test PREFIX "arrow-compute")
add_arrow_test(sort_to_indices_test PREFIX "arrow-compute")
add_arrow_test(util_internal_test PREFIX "arrow-compute")
add_arrow_test(add-test PREFIX "arrow-compute")
Expand Down
219 changes: 219 additions & 0 deletions cpp/src/arrow/compute/kernels/sort_arrays_to_indices.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/kernels/sort_arrays_to_indices.h"
#include <arrow/util/checked_cast.h>

#include <algorithm>
#include <memory>
#include <numeric>
#include <vector>

#include "arrow/builder.h"
#include "arrow/compute/context.h"
#include "arrow/compute/expression.h"
#include "arrow/compute/logical_type.h"
#include "arrow/type_traits.h"

namespace arrow {

class Array;

namespace compute {

/// \brief UnaryKernel implementing SortArraysToIndices operation
class ARROW_EXPORT SortArraysToIndicesKernel {
protected:
std::shared_ptr<DataType> type_;

public:
/// \brief UnaryKernel interface
///
/// delegates to subclasses via SortArraysToIndices()
virtual Status Call(FunctionContext* ctx, std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) = 0;

/// \brief output type of this kernel
std::shared_ptr<DataType> out_type() const { return uint64(); }

/// \brief single-array implementation
virtual Status SortArraysToIndices(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) = 0;

/// \brief factory for SortArraysToIndicesKernel
///
/// \param[in] value_type constructed SortArraysToIndicesKernel will support sorting
/// values of this type
/// \param[out] out created kernel
static Status Make(const std::shared_ptr<DataType>& value_type,
std::unique_ptr<SortArraysToIndicesKernel>* out);
};

template <typename ArrayType>
bool CompareValues(std::vector<std::shared_ptr<ArrayType>> arrays, ArrayItemIndex lhs,
ArrayItemIndex rhs) {
return arrays[lhs.array_id]->Value(lhs.id) < arrays[rhs.array_id]->Value(rhs.id);
}

template <typename ArrayType>
bool CompareViews(std::vector<std::shared_ptr<ArrayType>> arrays, ArrayItemIndex lhs,
ArrayItemIndex rhs) {
return arrays[lhs.array_id]->GetView(lhs.id) < arrays[rhs.array_id]->GetView(rhs.id);
}

template <typename ArrowType, typename Comparator>
class SortArraysToIndicesKernelImpl : public SortArraysToIndicesKernel {
using ArrayType = typename TypeTraits<ArrowType>::ArrayType;

public:
explicit SortArraysToIndicesKernelImpl(Comparator compare) : compare_(compare) {}

Status SortArraysToIndices(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) {
return SortArraysToIndicesImpl(ctx, values, offsets);
}

Status Call(FunctionContext* ctx, std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) override {
std::shared_ptr<Array> offsets_array;
RETURN_NOT_OK(this->SortArraysToIndices(ctx, values, &offsets_array));
*offsets = offsets_array;
return Status::OK();
}

std::shared_ptr<DataType> out_type() const { return type_; }

private:
Comparator compare_;

Status SortArraysToIndicesImpl(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) {
// initiate buffer for all arrays
std::shared_ptr<Buffer> indices_buf;
int64_t items_total = 0;
for (auto array : values) {
items_total += array->length();
}
int64_t buf_size = items_total * sizeof(ArrayItemIndex);
RETURN_NOT_OK(AllocateBuffer(ctx->memory_pool(), sizeof(ArrayItemIndex) * buf_size,
&indices_buf));

// start to partition not_null with null
ArrayItemIndex* indices_begin =
reinterpret_cast<ArrayItemIndex*>(indices_buf->mutable_data());
ArrayItemIndex* indices_end = indices_begin + items_total;
std::vector<std::shared_ptr<ArrayType>> typed_arrays;
int64_t array_id = 0;
int64_t null_count_total = 0;
int64_t indices_i = 0;
for (auto array : values) {
for (int64_t i = 0; i < array->length(); i++) {
if (!array->IsNull(i)) {
(indices_begin + indices_i)->array_id = array_id;
(indices_begin + indices_i)->id = i;
indices_i++;
} else {
(indices_end - null_count_total - 1)->array_id = array_id;
(indices_end - null_count_total - 1)->id = i;
null_count_total++;
}
}
typed_arrays.push_back(std::dynamic_pointer_cast<ArrayType>(array));
array_id++;
}
auto nulls_begin = indices_begin + items_total - null_count_total;
std::stable_sort(indices_begin, nulls_begin,
[typed_arrays, this](ArrayItemIndex left, ArrayItemIndex right) {
return compare_(typed_arrays, left, right);
});

*offsets = std::make_shared<FixedSizeBinaryArray>(
std::make_shared<FixedSizeBinaryType>(sizeof(ArrayItemIndex) / sizeof(int32_t)),
items_total, indices_buf);
return Status::OK();
}
};

template <typename ArrowType, typename Comparator>
SortArraysToIndicesKernelImpl<ArrowType, Comparator>* MakeSortArraysToIndicesKernelImpl(
Comparator comparator) {
return new SortArraysToIndicesKernelImpl<ArrowType, Comparator>(comparator);
}

Status SortArraysToIndicesKernel::Make(const std::shared_ptr<DataType>& value_type,
std::unique_ptr<SortArraysToIndicesKernel>* out) {
SortArraysToIndicesKernel* kernel;
switch (value_type->id()) {
case Type::UINT8:
kernel = MakeSortArraysToIndicesKernelImpl<UInt8Type>(CompareValues<UInt8Array>);
break;
case Type::INT8:
kernel = MakeSortArraysToIndicesKernelImpl<Int8Type>(CompareValues<Int8Array>);
break;
case Type::UINT16:
kernel = MakeSortArraysToIndicesKernelImpl<UInt16Type>(CompareValues<UInt16Array>);
break;
case Type::INT16:
kernel = MakeSortArraysToIndicesKernelImpl<Int16Type>(CompareValues<Int16Array>);
break;
case Type::UINT32:
kernel = MakeSortArraysToIndicesKernelImpl<UInt32Type>(CompareValues<UInt32Array>);
break;
case Type::INT32:
kernel = MakeSortArraysToIndicesKernelImpl<Int32Type>(CompareValues<Int32Array>);
break;
case Type::UINT64:
kernel = MakeSortArraysToIndicesKernelImpl<UInt64Type>(CompareValues<UInt64Array>);
break;
case Type::INT64:
kernel = MakeSortArraysToIndicesKernelImpl<Int64Type>(CompareValues<Int64Array>);
break;
case Type::FLOAT:
kernel = MakeSortArraysToIndicesKernelImpl<FloatType>(CompareValues<FloatArray>);
break;
case Type::DOUBLE:
kernel = MakeSortArraysToIndicesKernelImpl<DoubleType>(CompareValues<DoubleArray>);
break;
case Type::BINARY:
kernel = MakeSortArraysToIndicesKernelImpl<BinaryType>(CompareViews<BinaryArray>);
break;
case Type::STRING:
kernel = MakeSortArraysToIndicesKernelImpl<StringType>(CompareViews<StringArray>);
break;
default:
return Status::NotImplemented("Sorting of ", *value_type, " arrays");
}
out->reset(kernel);
return Status::OK();
}

Status SortArraysToIndices(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets) {
if (values.size() == 0) {
return Status::Invalid("Input ArrayList is empty");
}
std::unique_ptr<SortArraysToIndicesKernel> kernel;
RETURN_NOT_OK(SortArraysToIndicesKernel::Make(values[0]->type(), &kernel));
return kernel->Call(ctx, values, offsets);
}

} // namespace compute
} // namespace arrow
57 changes: 57 additions & 0 deletions cpp/src/arrow/compute/kernels/sort_arrays_to_indices.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <memory>

#include "arrow/compute/kernel.h"
#include "arrow/status.h"
#include "arrow/util/visibility.h"

namespace arrow {

class Array;

namespace compute {

class FunctionContext;

struct ArrayItemIndex {
uint64_t id = 0;
uint64_t array_id = 0;
};

/// \brief Returns the indices that would sort an array.
///
/// Perform an indirect sort of array. The output array will contain
/// indices that would sort an array, which would be the same length
/// as input. Nulls will be stably partitioned to the end of the output.
///
/// For example given values = [null, 1, 3.3, null, 2, 5.3], the output
/// will be [1, 4, 2, 5, 0, 3]
///
/// \param[in] ctx the FunctionContext
/// \param[in] values arrays to sort
/// \param[out] offsets indices that would sort an array
ARROW_EXPORT
Status SortArraysToIndices(FunctionContext* ctx,
std::vector<std::shared_ptr<Array>> values,
std::shared_ptr<Array>* offsets);

} // namespace compute
} // namespace arrow
Loading

0 comments on commit ec5747c

Please sign in to comment.