diff --git a/ci/docker/linux-apt-jni.dockerfile b/ci/docker/linux-apt-jni.dockerfile index b824ad6b32a9b..818dfcf793cf9 100644 --- a/ci/docker/linux-apt-jni.dockerfile +++ b/ci/docker/linux-apt-jni.dockerfile @@ -68,13 +68,14 @@ RUN wget -nv -O - https://github.com/Kitware/CMake/releases/download/v${cmake}/c ENV PATH=/opt/cmake-${cmake}-Linux-x86_64/bin:$PATH ENV ARROW_BUILD_TESTS=OFF \ + ARROW_DATASET=ON \ ARROW_FLIGHT=OFF \ ARROW_GANDIVA_JAVA=ON \ ARROW_GANDIVA=ON \ ARROW_HOME=/usr/local \ ARROW_JNI=ON \ ARROW_ORC=ON \ - ARROW_PARQUET=OFF \ + ARROW_PARQUET=ON \ ARROW_PLASMA_JAVA_CLIENT=ON \ ARROW_PLASMA=ON \ ARROW_USE_CCACHE=ON \ diff --git a/ci/scripts/java_test.sh b/ci/scripts/java_test.sh index 1383388ab41b6..819e307374a57 100755 --- a/ci/scripts/java_test.sh +++ b/ci/scripts/java_test.sh @@ -32,8 +32,8 @@ pushd ${source_dir} ${mvn} test -if [ "${ARROW_GANDIVA_JAVA}" = "ON" ]; then - ${mvn} test -Parrow-jni -pl gandiva -Darrow.cpp.build.dir=${cpp_build_dir} +if [ "${ARROW_JNI}" = "ON" ]; then + ${mvn} test -Parrow-jni -pl gandiva,dataset -Darrow.cpp.build.dir=${cpp_build_dir} fi if [ "${ARROW_PLASMA}" = "ON" ]; then diff --git a/cpp/.gitignore b/cpp/.gitignore index e0a9429a9b60d..b444c420554a5 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -24,6 +24,8 @@ cmake_install.cmake build/ *-build/ Testing/ +cmake-build-debug/ +cmake-build-release/ ######################################### # Editor temporary/working/backup files # diff --git a/cpp/src/arrow/dataset/discovery.cc b/cpp/src/arrow/dataset/discovery.cc index 8820e0ff499fc..b88907d2863ac 100644 --- a/cpp/src/arrow/dataset/discovery.cc +++ b/cpp/src/arrow/dataset/discovery.cc @@ -274,5 +274,28 @@ Result> FileSystemDatasetFactory::Finish(FinishOptions std::move(partitions)); } +Result> SingleFileDatasetFactory::Make( + std::string path, std::shared_ptr fs, + std::shared_ptr format) { + std::shared_ptr file_src = std::make_shared(path, fs.get()); + return std::shared_ptr(new SingleFileDatasetFactory( + std::move(file_src), std::move(fs), std::move(format))); +} + +Result>> SingleFileDatasetFactory::InspectSchemas( + InspectOptions options) { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr schema, format_->Inspect(*file_)) + return std::vector>{std::move(schema)}; +} + +Result> SingleFileDatasetFactory::Finish(FinishOptions options) { + return SingleFileDataset::Make(options.schema, scalar(true), file_, fs_, format_); +} + +SingleFileDatasetFactory::SingleFileDatasetFactory(std::shared_ptr file, + std::shared_ptr fs, + std::shared_ptr format) + : file_(std::move(file)), fs_(std::move(fs)), format_(std::move(format)) {} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/discovery.h b/cpp/src/arrow/dataset/discovery.h index ac009231eec90..d26f079e2ed58 100644 --- a/cpp/src/arrow/dataset/discovery.h +++ b/cpp/src/arrow/dataset/discovery.h @@ -232,5 +232,27 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory { FileSystemFactoryOptions options_; }; +class ARROW_DS_EXPORT SingleFileDatasetFactory : public DatasetFactory { + public: + static Result> Make(std::string path, + std::shared_ptr fs, + std::shared_ptr format); + + Result>> InspectSchemas( + InspectOptions options) override; + + Result> Finish(FinishOptions options) override; + + protected: + SingleFileDatasetFactory(std::shared_ptr file, + std::shared_ptr fs, + std::shared_ptr format); + + private: + std::shared_ptr file_; + std::shared_ptr fs_; + std::shared_ptr format_; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_base.cc b/cpp/src/arrow/dataset/file_base.cc index fee471d975f8b..239d28aec4049 100644 --- a/cpp/src/arrow/dataset/file_base.cc +++ b/cpp/src/arrow/dataset/file_base.cc @@ -18,6 +18,8 @@ #include "arrow/dataset/file_base.h" #include +#include +#include #include #include "arrow/dataset/dataset_internal.h" @@ -277,5 +279,49 @@ Status WriteTask::CreateDestinationParentDir() const { return Status::OK(); } +Result> SingleFileDataset::Make( + std::shared_ptr schema, std::shared_ptr root_partition, + std::string path, std::shared_ptr fs, + std::shared_ptr format) { + std::shared_ptr file = std::make_shared(path, fs.get()); + return std::make_shared(schema, root_partition, file, fs, format); +} + +Result> SingleFileDataset::Make( + std::shared_ptr schema, std::shared_ptr root_partition, + std::shared_ptr file, std::shared_ptr fs, + std::shared_ptr format) { + return std::make_shared(schema, root_partition, file, fs, format); +} + +SingleFileDataset::SingleFileDataset(std::shared_ptr schema, + std::shared_ptr root_partition, + std::shared_ptr file, + std::shared_ptr fs, + std::shared_ptr format) + : Dataset(std::move(schema), std::move(root_partition)), + file_(std::move(file)), + fs_(std::move(fs)), + format_(std::move(format)) {} + +FragmentIterator SingleFileDataset::GetFragmentsImpl( + std::shared_ptr options) { + std::vector> files({file_}); + auto file_srcs_it = MakeVectorIterator(std::move(files)); + auto file_src_to_fragment = [options, this](const std::shared_ptr& source) + -> Result> { + ARROW_ASSIGN_OR_RAISE(std::shared_ptr fragment, + format_->MakeFragment(*source, options)) + return fragment; + }; + return MakeMaybeMapIterator(file_src_to_fragment, std::move(file_srcs_it)); +} + +Result> SingleFileDataset::ReplaceSchema( + std::shared_ptr schema) const { + RETURN_NOT_OK(CheckProjectable(*schema_, *schema)); + return std::shared_ptr(new SingleFileDataset( + std::move(schema), partition_expression_, file_, fs_, format_)); +} } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_base.h b/cpp/src/arrow/dataset/file_base.h index e6d893193ff22..aba7d7228c39c 100644 --- a/cpp/src/arrow/dataset/file_base.h +++ b/cpp/src/arrow/dataset/file_base.h @@ -316,5 +316,35 @@ class ARROW_DS_EXPORT WritePlan { std::vector paths; }; +class ARROW_DS_EXPORT SingleFileDataset : public Dataset { + public: + SingleFileDataset(std::shared_ptr schema, + std::shared_ptr root_partition, + std::shared_ptr file, std::shared_ptr fs, + std::shared_ptr format); + + static Result> Make( + std::shared_ptr schema, std::shared_ptr root_partition, + std::string path, std::shared_ptr fs, + std::shared_ptr format); + + static Result> Make( + std::shared_ptr schema, std::shared_ptr root_partition, + std::shared_ptr file, std::shared_ptr fs, + std::shared_ptr format); + + std::string type_name() const override { return "single_file"; } + Result> ReplaceSchema( + std::shared_ptr schema) const override; + + protected: + FragmentIterator GetFragmentsImpl(std::shared_ptr options) override; + + private: + std::shared_ptr file_; + std::shared_ptr fs_; + std::shared_ptr format_; +}; + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/file_test.cc b/cpp/src/arrow/dataset/file_test.cc index 699ba98958760..55c3d8cbe64f9 100644 --- a/cpp/src/arrow/dataset/file_test.cc +++ b/cpp/src/arrow/dataset/file_test.cc @@ -211,5 +211,10 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) { }); } +TEST_F(TestSingleFileDataset, FragmentPartitions) { + MakeSingleFileDataset("A/a"); + AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"A/a"}); +} + } // namespace dataset } // namespace arrow diff --git a/cpp/src/arrow/dataset/scanner.cc b/cpp/src/arrow/dataset/scanner.cc index 701d90042aec9..a78450ba219e0 100644 --- a/cpp/src/arrow/dataset/scanner.cc +++ b/cpp/src/arrow/dataset/scanner.cc @@ -43,6 +43,7 @@ std::shared_ptr ScanOptions::ReplaceSchema( auto copy = ScanOptions::Make(std::move(schema)); copy->filter = filter; copy->evaluator = evaluator; + copy->batch_size = batch_size; return copy; } diff --git a/cpp/src/arrow/dataset/test_util.h b/cpp/src/arrow/dataset/test_util.h index b7feaef2b46db..0b27b01cf2df7 100644 --- a/cpp/src/arrow/dataset/test_util.h +++ b/cpp/src/arrow/dataset/test_util.h @@ -323,6 +323,21 @@ static const std::string& PathOf(const std::shared_ptr& fragment) { class TestFileSystemDataset : public ::testing::Test, public MakeFileSystemDatasetMixin {}; +class TestSingleFileDataset : public ::testing::Test { + public: + void MakeSingleFileDataset(const std::string& path) { + auto format = std::make_shared(); + ASSERT_OK_AND_ASSIGN( + dataset_, SingleFileDataset::Make(schema({}), scalar(true), + std::make_shared(path, fs_.get()), + fs_, format)); + } + + std::shared_ptr fs_; + std::shared_ptr dataset_; + std::shared_ptr options_ = ScanOptions::Make(schema({})); +}; + static std::vector PathsOf(const FragmentVector& fragments) { std::vector paths(fragments.size()); std::transform(fragments.begin(), fragments.end(), paths.begin(), PathOf); diff --git a/cpp/src/gandiva/function_registry_arithmetic.cc b/cpp/src/gandiva/function_registry_arithmetic.cc index d24020fd0167a..3458c21d95004 100644 --- a/cpp/src/gandiva/function_registry_arithmetic.cc +++ b/cpp/src/gandiva/function_registry_arithmetic.cc @@ -37,7 +37,13 @@ std::vector GetArithmeticFunctionRegistry() { static std::vector arithmetic_fn_registry_ = { UNARY_SAFE_NULL_IF_NULL(not, {}, boolean, boolean), UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, int32, int64), + UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, float32, int64), + UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, float64, int64), + UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, date64, int64), UNARY_SAFE_NULL_IF_NULL(castINT, {}, int64, int32), + UNARY_SAFE_NULL_IF_NULL(castINT, {}, date32, int32), + UNARY_SAFE_NULL_IF_NULL(castINT, {}, float32, int32), + UNARY_SAFE_NULL_IF_NULL(castINT, {}, float64, int32), UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, decimal128, int64), // cast to float32 @@ -62,6 +68,7 @@ std::vector GetArithmeticFunctionRegistry() { UNARY_SAFE_NULL_IF_NULL(castDATE, {}, int64, date64), UNARY_SAFE_NULL_IF_NULL(castDATE, {}, int32, date32), + UNARY_SAFE_NULL_IF_NULL(castDATE, {}, date32, date64), // add/sub/multiply/divide/mod BINARY_SYMMETRIC_FN(add, {}), BINARY_SYMMETRIC_FN(subtract, {}), diff --git a/cpp/src/gandiva/function_registry_math_ops.cc b/cpp/src/gandiva/function_registry_math_ops.cc index 771e363e3048f..722b908f2eb7a 100644 --- a/cpp/src/gandiva/function_registry_math_ops.cc +++ b/cpp/src/gandiva/function_registry_math_ops.cc @@ -59,6 +59,11 @@ std::vector GetMathOpsFunctionRegistry() { BINARY_SAFE_NULL_NEVER_BOOL_FN(is_distinct_from, {}), BINARY_SAFE_NULL_NEVER_BOOL_FN(is_not_distinct_from, {}), + UNARY_SAFE_NULL_IF_NULL(abs, {}, int32, uint32), + UNARY_SAFE_NULL_IF_NULL(abs, {}, int64, uint64), + UNARY_SAFE_NULL_IF_NULL(abs, {}, float32, float32), + UNARY_SAFE_NULL_IF_NULL(abs, {}, float64, float64), + // decimal functions UNARY_SAFE_NULL_IF_NULL(abs, {}, decimal128, decimal128), UNARY_SAFE_NULL_IF_NULL(ceil, {}, decimal128, decimal128), diff --git a/cpp/src/gandiva/jni/jni_common.cc b/cpp/src/gandiva/jni/jni_common.cc index 453c0a487d3f0..e09daf6a48f1a 100644 --- a/cpp/src/gandiva/jni/jni_common.cc +++ b/cpp/src/gandiva/jni/jni_common.cc @@ -350,7 +350,7 @@ NodePtr ProtoTypeToOrNode(const types::OrNode& node) { } NodePtr ProtoTypeToInNode(const types::InNode& node) { - NodePtr field = ProtoTypeToFieldNode(node.field()); + NodePtr field = ProtoTypeToNode(node.node()); if (node.has_intvalues()) { std::unordered_set int_values; diff --git a/cpp/src/gandiva/precompiled/arithmetic_ops.cc b/cpp/src/gandiva/precompiled/arithmetic_ops.cc index afeb2f9466bf2..b590d5d5b6adb 100644 --- a/cpp/src/gandiva/precompiled/arithmetic_ops.cc +++ b/cpp/src/gandiva/precompiled/arithmetic_ops.cc @@ -106,7 +106,13 @@ NUMERIC_DATE_TYPES(BINARY_RELATIONAL, greater_than_or_equal_to, >=) } CAST_UNARY(castBIGINT, int32, int64) +CAST_UNARY(castBIGINT, date64, int64) +CAST_UNARY(castBIGINT, float32, int64) +CAST_UNARY(castBIGINT, float64, int64) CAST_UNARY(castINT, int64, int32) +CAST_UNARY(castINT, date32, int32) +CAST_UNARY(castINT, float32, int32) +CAST_UNARY(castINT, float64, int32) CAST_UNARY(castFLOAT4, int32, float32) CAST_UNARY(castFLOAT4, int64, float32) CAST_UNARY(castFLOAT8, int32, float64) diff --git a/cpp/src/gandiva/precompiled/extended_math_ops.cc b/cpp/src/gandiva/precompiled/extended_math_ops.cc index e6b6a6e3af111..7f482f95b26b6 100644 --- a/cpp/src/gandiva/precompiled/extended_math_ops.cc +++ b/cpp/src/gandiva/precompiled/extended_math_ops.cc @@ -72,6 +72,24 @@ ENUMERIC_TYPES_UNARY(LOG, float64) ENUMERIC_TYPES_UNARY(LOG10, float64) +// abs +#define ABS_TYPES_UNARY(IN_TYPE, OUT_TYPE) \ + FORCE_INLINE \ + gdv_##OUT_TYPE abs_##IN_TYPE(gdv_##IN_TYPE in) { \ + return static_cast(abs((in))); \ + } + +#define ABS_FTYPES_UNARY(IN_TYPE, OUT_TYPE) \ + FORCE_INLINE \ + gdv_##OUT_TYPE abs_##IN_TYPE(gdv_##IN_TYPE in) { \ + return static_cast(fabs((in))); \ + } + +ABS_TYPES_UNARY(int32, uint32) +ABS_TYPES_UNARY(int64, uint64) +ABS_FTYPES_UNARY(float32, float32) +ABS_FTYPES_UNARY(float64, float64) + FORCE_INLINE void set_error_for_logbase(int64_t execution_context, double base) { char const* prefix = "divide by zero error with log of base"; diff --git a/cpp/src/gandiva/precompiled/time.cc b/cpp/src/gandiva/precompiled/time.cc index fa38e134e9847..8374c155b810b 100644 --- a/cpp/src/gandiva/precompiled/time.cc +++ b/cpp/src/gandiva/precompiled/time.cc @@ -37,7 +37,6 @@ extern "C" { // Expand inner macro for all date types. #define DATE_TYPES(INNER) \ - INNER(date32) \ INNER(date64) \ INNER(timestamp) @@ -454,9 +453,13 @@ DATE_TRUNC_FUNCTIONS(timestamp) FORCE_INLINE gdv_date64 castDATE_int64(gdv_int64 in) { return in; } + FORCE_INLINE gdv_date32 castDATE_int32(gdv_int32 in) { return in; } +FORCE_INLINE +gdv_date64 castDATE_date32(gdv_date32 days) { return (gdv_int64)days * MILLIS_IN_DAY; } + static int days_in_month[] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31}; bool IsLastDayOfMonth(const EpochTimePoint& tp) { diff --git a/cpp/src/gandiva/precompiled/time_test.cc b/cpp/src/gandiva/precompiled/time_test.cc index 9771b7ed31dfc..bb734d9f01988 100644 --- a/cpp/src/gandiva/precompiled/time_test.cc +++ b/cpp/src/gandiva/precompiled/time_test.cc @@ -52,6 +52,8 @@ TEST(TestTime, TestCastDate) { EXPECT_EQ(castDATE_utf8(context_ptr, "71-1-1", 6), 31536000000); EXPECT_EQ(castDATE_utf8(context_ptr, "71-45-1", 7), 0); EXPECT_EQ(castDATE_utf8(context_ptr, "71-12-XX", 8), 0); + + EXPECT_EQ(castDATE_date32(1), 86400000); } TEST(TestTime, TestCastTimestamp) { diff --git a/cpp/src/gandiva/precompiled/types.h b/cpp/src/gandiva/precompiled/types.h index 45bc72b1b1d40..8b2823621aa9c 100644 --- a/cpp/src/gandiva/precompiled/types.h +++ b/cpp/src/gandiva/precompiled/types.h @@ -152,6 +152,11 @@ gdv_float64 log10_int64(gdv_int64); gdv_float64 log10_float32(gdv_float32); gdv_float64 log10_float64(gdv_float64); +gdv_uint32 abs_int32(gdv_int32); +gdv_uint64 abs_int64(gdv_int64); +gdv_float32 abs_float32(gdv_float32); +gdv_float64 abs_float64(gdv_float64); + gdv_float64 power_float64_float64(gdv_float64, gdv_float64); gdv_float64 log_int32_int32(gdv_int64 context, gdv_int32 base, gdv_int32 value); @@ -169,6 +174,8 @@ gdv_date64 castDATE_utf8(int64_t execution_context, const char* input, gdv_int32 gdv_date64 castDATE_int64(gdv_int64 date); +gdv_date64 castDATE_date32(gdv_date32 date); + gdv_date32 castDATE_int32(gdv_int32 date); gdv_timestamp castTIMESTAMP_utf8(int64_t execution_context, const char* input, diff --git a/cpp/src/gandiva/proto/Types.proto b/cpp/src/gandiva/proto/Types.proto index 02ba2142c4b80..9020ccdc5a0b3 100644 --- a/cpp/src/gandiva/proto/Types.proto +++ b/cpp/src/gandiva/proto/Types.proto @@ -216,7 +216,7 @@ message FunctionSignature { } message InNode { - optional FieldNode field = 1; + optional TreeNode node = 1; optional IntConstants intValues = 2; optional LongConstants longValues = 3; optional StringConstants stringValues = 4; diff --git a/cpp/src/gandiva/tests/date_time_test.cc b/cpp/src/gandiva/tests/date_time_test.cc index 79b6109b2d186..11371b083f7a0 100644 --- a/cpp/src/gandiva/tests/date_time_test.cc +++ b/cpp/src/gandiva/tests/date_time_test.cc @@ -19,6 +19,7 @@ #include #include #include "arrow/memory_pool.h" +#include "gandiva/precompiled/time_constants.h" #include "gandiva/projector.h" #include "gandiva/tests/test_util.h" #include "gandiva/tree_expr_builder.h" @@ -88,6 +89,26 @@ int64_t MillisSince(time_t base_line, int32_t yy, int32_t mm, int32_t dd, int32_ return static_cast(ts - base_line) * 1000 + millis; } +int32_t DaysSince(time_t base_line, int32_t yy, int32_t mm, int32_t dd, int32_t hr, + int32_t min, int32_t sec, int32_t millis) { + struct tm given_ts; + memset(&given_ts, 0, sizeof(struct tm)); + given_ts.tm_year = (yy - 1900); + given_ts.tm_mon = (mm - 1); + given_ts.tm_mday = dd; + given_ts.tm_hour = hr; + given_ts.tm_min = min; + given_ts.tm_sec = sec; + + time_t ts = mktime(&given_ts); + if (ts == static_cast(-1)) { + ARROW_LOG(FATAL) << "mktime() failed"; + } + // time_t is an arithmetic type on both POSIX and Windows, we can simply + // subtract to get a duration in seconds. + return static_cast(((ts - base_line) * 1000 + millis) / MILLIS_IN_DAY); +} + TEST_F(TestProjector, TestIsNull) { auto d0 = field("d0", date64()); auto t0 = field("t0", time32(arrow::TimeUnit::MILLI)); @@ -170,14 +191,16 @@ TEST_F(TestProjector, TestDate32IsNull) { TEST_F(TestProjector, TestDateTime) { auto field0 = field("f0", date64()); + auto field1 = field("f1", date32()); auto field2 = field("f2", timestamp(arrow::TimeUnit::MILLI)); - auto schema = arrow::schema({field0, field2}); + auto schema = arrow::schema({field0, field1, field2}); // output fields auto field_year = field("yy", int64()); auto field_month = field("mm", int64()); auto field_day = field("dd", int64()); auto field_hour = field("hh", int64()); + auto field_date64 = field("date64", date64()); // extract year and month from date auto date2year_expr = @@ -185,15 +208,30 @@ TEST_F(TestProjector, TestDateTime) { auto date2month_expr = TreeExprBuilder::MakeExpression("extractMonth", {field0}, field_month); + // extract year and month from date32, cast to date64 first + auto node_f1 = TreeExprBuilder::MakeField(field1); + auto date32_to_date64_func = + TreeExprBuilder::MakeFunction("castDATE", {node_f1}, date64()); + + auto date64_2year_func = + TreeExprBuilder::MakeFunction("extractYear", {date32_to_date64_func}, int64()); + auto date64_2year_expr = TreeExprBuilder::MakeExpression(date64_2year_func, field_year); + + auto date64_2month_func = + TreeExprBuilder::MakeFunction("extractMonth", {date32_to_date64_func}, int64()); + auto date64_2month_expr = + TreeExprBuilder::MakeExpression(date64_2month_func, field_month); + // extract month and day from timestamp auto ts2month_expr = TreeExprBuilder::MakeExpression("extractMonth", {field2}, field_month); auto ts2day_expr = TreeExprBuilder::MakeExpression("extractDay", {field2}, field_day); std::shared_ptr projector; - auto status = Projector::Make( - schema, {date2year_expr, date2month_expr, ts2month_expr, ts2day_expr}, - TestConfiguration(), &projector); + auto status = Projector::Make(schema, + {date2year_expr, date2month_expr, date64_2year_expr, + date64_2month_expr, ts2month_expr, ts2day_expr}, + TestConfiguration(), &projector); ASSERT_TRUE(status.ok()); // Create a row-batch with some sample data @@ -207,6 +245,13 @@ TEST_F(TestProjector, TestDateTime) { auto array0 = MakeArrowTypeArray(date64(), field0_data, validity); + std::vector field1_data = {DaysSince(epoch, 2000, 1, 1, 5, 0, 0, 0), + DaysSince(epoch, 1999, 12, 31, 5, 0, 0, 0), + DaysSince(epoch, 2015, 6, 30, 20, 0, 0, 0), + DaysSince(epoch, 2015, 7, 1, 20, 0, 0, 0)}; + auto array1 = + MakeArrowTypeArray(date32(), field1_data, validity); + std::vector field2_data = {MillisSince(epoch, 1999, 12, 31, 5, 0, 0, 0), MillisSince(epoch, 2000, 1, 2, 5, 0, 0, 0), MillisSince(epoch, 2015, 7, 1, 1, 0, 0, 0), @@ -216,16 +261,20 @@ TEST_F(TestProjector, TestDateTime) { arrow::timestamp(arrow::TimeUnit::MILLI), field2_data, validity); // expected output - // date 2 year and date 2 month - auto exp_yy_from_date = MakeArrowArrayInt64({2000, 1999, 2015, 2015}, validity); - auto exp_mm_from_date = MakeArrowArrayInt64({1, 12, 6, 7}, validity); + // date 2 year and date 2 month for date64 + auto exp_yy_from_date64 = MakeArrowArrayInt64({2000, 1999, 2015, 2015}, validity); + auto exp_mm_from_date64 = MakeArrowArrayInt64({1, 12, 6, 7}, validity); + + // date 2 year and date 2 month for date32 + auto exp_yy_from_date32 = MakeArrowArrayInt64({2000, 1999, 2015, 2015}, validity); + auto exp_mm_from_date32 = MakeArrowArrayInt64({1, 12, 6, 7}, validity); // ts 2 month and ts 2 day auto exp_mm_from_ts = MakeArrowArrayInt64({12, 1, 7, 6}, validity); auto exp_dd_from_ts = MakeArrowArrayInt64({31, 2, 1, 29}, validity); // prepare input record batch - auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array2}); + auto in_batch = arrow::RecordBatch::Make(schema, num_records, {array0, array1, array2}); // Evaluate expression arrow::ArrayVector outputs; @@ -233,10 +282,12 @@ TEST_F(TestProjector, TestDateTime) { EXPECT_TRUE(status.ok()); // Validate results - EXPECT_ARROW_ARRAY_EQUALS(exp_yy_from_date, outputs.at(0)); - EXPECT_ARROW_ARRAY_EQUALS(exp_mm_from_date, outputs.at(1)); - EXPECT_ARROW_ARRAY_EQUALS(exp_mm_from_ts, outputs.at(2)); - EXPECT_ARROW_ARRAY_EQUALS(exp_dd_from_ts, outputs.at(3)); + EXPECT_ARROW_ARRAY_EQUALS(exp_yy_from_date64, outputs.at(0)); + EXPECT_ARROW_ARRAY_EQUALS(exp_mm_from_date64, outputs.at(1)); + EXPECT_ARROW_ARRAY_EQUALS(exp_yy_from_date32, outputs.at(2)); + EXPECT_ARROW_ARRAY_EQUALS(exp_mm_from_date32, outputs.at(3)); + EXPECT_ARROW_ARRAY_EQUALS(exp_mm_from_ts, outputs.at(4)); + EXPECT_ARROW_ARRAY_EQUALS(exp_dd_from_ts, outputs.at(5)); } TEST_F(TestProjector, TestTime) { diff --git a/cpp/src/jni/CMakeLists.txt b/cpp/src/jni/CMakeLists.txt index 3872d671934da..570b13f280100 100644 --- a/cpp/src/jni/CMakeLists.txt +++ b/cpp/src/jni/CMakeLists.txt @@ -22,3 +22,7 @@ if(ARROW_ORC) add_subdirectory(orc) endif() + +if(ARROW_DATASET) + add_subdirectory(dataset) +endif() diff --git a/cpp/src/jni/dataset/CMakeLists.txt b/cpp/src/jni/dataset/CMakeLists.txt new file mode 100644 index 0000000000000..84a630ea35257 --- /dev/null +++ b/cpp/src/jni/dataset/CMakeLists.txt @@ -0,0 +1,84 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitationsn +# under the License. + +# +# arrow_dataset_jni +# + +project(arrow_dataset_jni) + +cmake_minimum_required(VERSION 3.11) + +find_package(JNI REQUIRED) + +add_custom_target(arrow_dataset_jni) + +set(JNI_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") + +add_subdirectory(../../../../java/dataset ./java) + +set(ARROW_BUILD_STATIC OFF) + +set(ARROW_DATASET_JNI_LIBS arrow_dataset_static ${ARROW_PROTOBUF_LIBPROTOBUF}) +set(PROTO_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}) +set(PROTO_OUTPUT_FILES "${PROTO_OUTPUT_DIR}/DTypes.pb.cc") +set(PROTO_OUTPUT_FILES ${PROTO_OUTPUT_FILES} "${PROTO_OUTPUT_DIR}/DTypes.pb.h") + +set_source_files_properties(${PROTO_OUTPUT_FILES} PROPERTIES GENERATED TRUE) + +get_filename_component(ABS_ARROW_DATASET_PROTO + ${CMAKE_SOURCE_DIR}/src/jni/dataset/proto/DTypes.proto ABSOLUTE) + +add_custom_command(OUTPUT ${PROTO_OUTPUT_FILES} + COMMAND ${ARROW_PROTOBUF_PROTOC} + --proto_path + ${CMAKE_SOURCE_DIR}/src/jni/dataset/proto + --cpp_out + ${PROTO_OUTPUT_DIR} + ${CMAKE_SOURCE_DIR}/src/jni/dataset/proto/DTypes.proto + DEPENDS ${ABS_ARROW_DATASET_PROTO} ${ARROW_PROTOBUF_LIBPROTOBUF} + COMMENT "Running PROTO compiler on DTypes.proto" + VERBATIM) + +add_custom_target(arrow_dataset_jni_proto ALL DEPENDS ${PROTO_OUTPUT_FILES}) + +set(PROTO_SRCS "${PROTO_OUTPUT_DIR}/DTypes.pb.cc") + +set(PROTO_HDRS "${PROTO_OUTPUT_DIR}/DTypes.pb.h") + +set(ARROW_DATASET_JNI_SOURCES jni_wrapper.cpp ${PROTO_SRCS}) + +add_arrow_lib(arrow_dataset_jni + BUILD_SHARED + SOURCES + ${ARROW_DATASET_JNI_SOURCES} + OUTPUTS + ARROW_DATASET_JNI_LIBRARIES + SHARED_PRIVATE_LINK_LIBS + ${ARROW_DATASET_JNI_LIBS} + STATIC_LINK_LIBS + ${ARROW_DATASET_JNI_LIBS} + EXTRA_INCLUDES + ${JNI_HEADERS_DIR} + PRIVATE_INCLUDES + ${JNI_INCLUDE_DIRS} + DEPENDENCIES + arrow_static + arrow_dataset_java + arrow_dataset_jni_proto) + +add_dependencies(arrow_dataset_jni ${ARROW_DATASET_JNI_LIBRARIES}) diff --git a/cpp/src/jni/dataset/concurrent_map.h b/cpp/src/jni/dataset/concurrent_map.h new file mode 100644 index 0000000000000..61d3ca29a3cbd --- /dev/null +++ b/cpp/src/jni/dataset/concurrent_map.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + */ + +#ifndef JNI_ID_TO_MODULE_MAP_H +#define JNI_ID_TO_MODULE_MAP_H + +#include +#include +#include +#include +#include + +#include "arrow/util/macros.h" + +namespace arrow { +namespace jni { + +/** + * An utility class that map module id to module pointers. + * @tparam Holder class of the object to hold. + */ +template +class ConcurrentMap { + public: + ConcurrentMap() : module_id_(init_module_id_) {} + + jlong Insert(Holder holder) { + std::lock_guard lock(mtx_); + jlong result = module_id_++; + map_.insert(std::pair(result, holder)); + return result; + } + + void Erase(jlong module_id) { + std::lock_guard lock(mtx_); + map_.erase(module_id); + } + + Holder Lookup(jlong module_id) { + std::lock_guard lock(mtx_); + auto it = map_.find(module_id); + if (it != map_.end()) { + return it->second; + } + return NULLPTR; + } + + void Clear() { + std::lock_guard lock(mtx_); + map_.clear(); + } + + private: + // Initialize the module id starting value to a number greater than zero + // to allow for easier debugging of uninitialized java variables. + static constexpr int init_module_id_ = 4; + + int64_t module_id_; + std::mutex mtx_; + // map from module ids returned to Java and module pointers + std::unordered_map map_; +}; + +} // namespace jni +} // namespace arrow + +#endif // JNI_ID_TO_MODULE_MAP_H diff --git a/cpp/src/jni/dataset/jni_wrapper.cpp b/cpp/src/jni/dataset/jni_wrapper.cpp new file mode 100644 index 0000000000000..2f816361fff45 --- /dev/null +++ b/cpp/src/jni/dataset/jni_wrapper.cpp @@ -0,0 +1,607 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "arrow/compute/kernel.h" +#include "arrow/compute/kernels/cast.h" +#include "arrow/compute/kernels/compare.h" +#include "jni/dataset/DTypes.pb.h" +#include "jni/dataset/concurrent_map.h" + +#include "org_apache_arrow_dataset_file_JniWrapper.h" +#include "org_apache_arrow_dataset_jni_JniWrapper.h" + +static jclass illegal_access_exception_class; +static jclass illegal_argument_exception_class; +static jclass runtime_exception_class; + +static jclass record_batch_handle_class; +static jclass record_batch_handle_field_class; +static jclass record_batch_handle_buffer_class; + +static jmethodID record_batch_handle_constructor; +static jmethodID record_batch_handle_field_constructor; +static jmethodID record_batch_handle_buffer_constructor; + +static jint JNI_VERSION = JNI_VERSION_1_6; + +using arrow::jni::ConcurrentMap; + +static ConcurrentMap> + dataset_factory_holder_; +static ConcurrentMap> dataset_holder_; +static ConcurrentMap> scan_task_holder_; +static ConcurrentMap> scanner_holder_; +static ConcurrentMap> iterator_holder_; +static ConcurrentMap> buffer_holder_; + +#define JNI_ASSIGN_OR_THROW_NAME(x, y) ARROW_CONCAT(x, y) + +#define JNI_ASSIGN_OR_THROW_IMPL(t, lhs, rexpr) \ + auto t = (rexpr); \ + if (!t.status().ok()) { \ + env->ThrowNew(runtime_exception_class, t.status().message().c_str()); \ + } \ + lhs = std::move(t).ValueOrDie(); + +#define JNI_ASSIGN_OR_THROW(lhs, rexpr) \ + JNI_ASSIGN_OR_THROW_IMPL(JNI_ASSIGN_OR_THROW_NAME(_tmp_var, __COUNTER__), lhs, rexpr) + +#define JNI_ASSERT_OK_OR_THROW(expr) \ + do { \ + auto _res = (expr); \ + arrow::Status _st = ::arrow::internal::GenericToStatus(_res); \ + if (!_st.ok()) { \ + env->ThrowNew(runtime_exception_class, _st.message().c_str()); \ + } \ + } while (false); + +jclass CreateGlobalClassReference(JNIEnv* env, const char* class_name) { + jclass local_class = env->FindClass(class_name); + jclass global_class = (jclass)env->NewGlobalRef(local_class); + env->DeleteLocalRef(local_class); + return global_class; +} + +jmethodID GetMethodID(JNIEnv* env, jclass this_class, const char* name, const char* sig) { + jmethodID ret = env->GetMethodID(this_class, name, sig); + if (ret == nullptr) { + std::string error_message = "Unable to find method " + std::string(name) + + " within signature" + std::string(sig); + env->ThrowNew(illegal_access_exception_class, error_message.c_str()); + } + return ret; +} + +jint JNI_OnLoad(JavaVM* vm, void* reserved) { + JNIEnv* env; + if (vm->GetEnv(reinterpret_cast(&env), JNI_VERSION) != JNI_OK) { + return JNI_ERR; + } + + illegal_access_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalAccessException;"); + illegal_argument_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/IllegalArgumentException;"); + runtime_exception_class = + CreateGlobalClassReference(env, "Ljava/lang/RuntimeException;"); + + record_batch_handle_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle;"); + record_batch_handle_field_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle$Field;"); + record_batch_handle_buffer_class = CreateGlobalClassReference( + env, "Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle$Buffer;"); + + record_batch_handle_constructor = + GetMethodID(env, record_batch_handle_class, "", + "(J[Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle$Field;" + "[Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle$Buffer;)V"); + record_batch_handle_field_constructor = + GetMethodID(env, record_batch_handle_field_class, "", "(JJ)V"); + record_batch_handle_buffer_constructor = + GetMethodID(env, record_batch_handle_buffer_class, "", "(JJJJ)V"); + + env->ExceptionDescribe(); + + return JNI_VERSION; +} + +void JNI_OnUnload(JavaVM* vm, void* reserved) { + JNIEnv* env; + vm->GetEnv(reinterpret_cast(&env), JNI_VERSION); + env->DeleteGlobalRef(illegal_access_exception_class); + env->DeleteGlobalRef(illegal_argument_exception_class); + env->DeleteGlobalRef(runtime_exception_class); + env->DeleteGlobalRef(record_batch_handle_class); + env->DeleteGlobalRef(record_batch_handle_field_class); + env->DeleteGlobalRef(record_batch_handle_buffer_class); + + dataset_factory_holder_.Clear(); + dataset_holder_.Clear(); + scan_task_holder_.Clear(); + scanner_holder_.Clear(); + iterator_holder_.Clear(); + buffer_holder_.Clear(); +} + +std::shared_ptr SchemaFromColumnNames( + const std::shared_ptr& input, + const std::vector& column_names) { + std::vector> columns; + for (const auto& name : column_names) { + columns.push_back(input->GetFieldByName(name)); + } + return std::make_shared(columns); +} + +std::shared_ptr GetFileFormat(JNIEnv* env, jint id) { + switch (id) { + case 0: + return std::make_shared(); + default: + std::string error_message = "illegal file format id: " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + return nullptr; // unreachable + } +} + +std::shared_ptr GetFileSystem(JNIEnv* env, jint id, + std::string path, + std::string* out_path) { + switch (id) { + case 0: + *out_path = path; + return std::make_shared(); + case 1: { + JNI_ASSIGN_OR_THROW(std::shared_ptr ret, + arrow::fs::FileSystemFromUri(path, out_path)) + return ret; + } + default: + std::string error_message = "illegal filesystem id: " + std::to_string(id); + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + return nullptr; // unreachable + } +} + +std::string JStringToCString(JNIEnv* env, jstring string) { + jboolean copied; + int32_t length = env->GetStringUTFLength(string); + const char* chars = env->GetStringUTFChars(string, &copied); + std::string str = std::string(chars, length); + // fixme calling ReleaseStringUTFChars if memory leak faced + return str; +} + +std::vector ToStringVector(JNIEnv* env, jobjectArray& str_array) { + int length = env->GetArrayLength(str_array); + std::vector vector; + for (int i = 0; i < length; i++) { + auto string = (jstring)(env->GetObjectArrayElement(str_array, i)); + vector.push_back(JStringToCString(env, string)); + } + return vector; +} + +template +std::vector collect(JNIEnv* env, arrow::Iterator itr) { + std::vector vector; + while (true) { + JNI_ASSIGN_OR_THROW(T t, itr.Next()) + if (!t) { + break; + } + vector.push_back(t); + } + return vector; +} + +// FIXME: COPIED FROM intel/master on which this branch is not rebased yet +// FIXME: +// https://github.com/Intel-bigdata/arrow/blob/02502a4eb59834c2471dd629e77dbeed19559f68/cpp/src/jni/jni_common.h#L239-L254 +jbyteArray ToSchemaByteArray(JNIEnv* env, std::shared_ptr schema) { + JNI_ASSIGN_OR_THROW( + std::shared_ptr buffer, + arrow::ipc::SerializeSchema(*schema.get(), nullptr, arrow::default_memory_pool())); + + jbyteArray out = env->NewByteArray(buffer->size()); + auto src = reinterpret_cast(buffer->data()); + env->SetByteArrayRegion(out, 0, buffer->size(), src); + return out; +} + +// FIXME: COPIED FROM intel/master on which this branch is not rebased yet +// FIXME: +// https://github.com/Intel-bigdata/arrow/blob/02502a4eb59834c2471dd629e77dbeed19559f68/cpp/src/jni/jni_common.h#L256-L272 +std::shared_ptr FromSchemaByteArray(JNIEnv* env, jbyteArray schemaBytes) { + arrow::ipc::DictionaryMemo in_memo; + int schemaBytes_len = env->GetArrayLength(schemaBytes); + jbyte* schemaBytes_data = env->GetByteArrayElements(schemaBytes, 0); + auto serialized_schema = + std::make_shared((uint8_t*)schemaBytes_data, schemaBytes_len); + arrow::io::BufferReader buf_reader(serialized_schema); + JNI_ASSIGN_OR_THROW(std::shared_ptr schema, + arrow::ipc::ReadSchema(&buf_reader, &in_memo)) + env->ReleaseByteArrayElements(schemaBytes, schemaBytes_data, JNI_ABORT); + return schema; +} + +bool ParseProtobuf(uint8_t* buf, int bufLen, google::protobuf::Message* msg) { + google::protobuf::io::CodedInputStream cis(buf, bufLen); + cis.SetRecursionLimit(1000); + return msg->ParseFromCodedStream(&cis); +} + +void releaseFilterInput(jbyteArray condition_arr, jbyte* condition_bytes, JNIEnv* env) { + env->ReleaseByteArrayElements(condition_arr, condition_bytes, JNI_ABORT); +} + +// fixme in development. Not all node types considered. +std::shared_ptr translateNode( + arrow::dataset::types::TreeNode node, JNIEnv* env) { + if (node.has_fieldnode()) { + const arrow::dataset::types::FieldNode& f_node = node.fieldnode(); + const std::string& name = f_node.name(); + return std::make_shared(name); + } + if (node.has_intnode()) { + const arrow::dataset::types::IntNode& int_node = node.intnode(); + int32_t val = int_node.value(); + return std::make_shared( + std::make_shared(val)); + } + if (node.has_longnode()) { + const arrow::dataset::types::LongNode& long_node = node.longnode(); + int64_t val = long_node.value(); + return std::make_shared( + std::make_shared(val)); + } + if (node.has_floatnode()) { + const arrow::dataset::types::FloatNode& float_node = node.floatnode(); + float_t val = float_node.value(); + return std::make_shared( + std::make_shared(val)); + } + if (node.has_doublenode()) { + const arrow::dataset::types::DoubleNode& double_node = node.doublenode(); + double_t val = double_node.value(); + return std::make_shared( + std::make_shared(val)); + } + if (node.has_booleannode()) { + const arrow::dataset::types::BooleanNode& boolean_node = node.booleannode(); + bool val = boolean_node.value(); + return std::make_shared( + std::make_shared(val)); + } + if (node.has_andnode()) { + const arrow::dataset::types::AndNode& and_node = node.andnode(); + const arrow::dataset::types::TreeNode& left_arg = and_node.leftarg(); + const arrow::dataset::types::TreeNode& right_arg = and_node.rightarg(); + return std::make_shared(translateNode(left_arg, env), + translateNode(right_arg, env)); + } + if (node.has_ornode()) { + const arrow::dataset::types::OrNode& or_node = node.ornode(); + const arrow::dataset::types::TreeNode& left_arg = or_node.leftarg(); + const arrow::dataset::types::TreeNode& right_arg = or_node.rightarg(); + return std::make_shared(translateNode(left_arg, env), + translateNode(right_arg, env)); + } + if (node.has_cpnode()) { + const arrow::dataset::types::ComparisonNode& cp_node = node.cpnode(); + const std::string& op_name = cp_node.opname(); + arrow::compute::CompareOperator op; + if (op_name == "equal") { + op = arrow::compute::CompareOperator::EQUAL; + } else if (op_name == "greaterThan") { + op = arrow::compute::CompareOperator::GREATER; + } else if (op_name == "greaterThanOrEqual") { + op = arrow::compute::CompareOperator::GREATER_EQUAL; + } else if (op_name == "lessThan") { + op = arrow::compute::CompareOperator::LESS; + } else if (op_name == "lessThanOrEqual") { + op = arrow::compute::CompareOperator::LESS_EQUAL; + } else { + std::string error_message = "Unknown operation name in comparison node"; + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + return nullptr; // unreachable + } + const arrow::dataset::types::TreeNode& left_arg = cp_node.leftarg(); + const arrow::dataset::types::TreeNode& right_arg = cp_node.rightarg(); + return std::make_shared( + op, translateNode(left_arg, env), translateNode(right_arg, env)); + } + if (node.has_notnode()) { + const arrow::dataset::types::NotNode& not_node = node.notnode(); + const ::arrow::dataset::types::TreeNode& child = not_node.args(); + std::shared_ptr translatedChild = + translateNode(child, env); + return std::make_shared(translatedChild); + } + if (node.has_isvalidnode()) { + const arrow::dataset::types::IsValidNode& is_valid_node = node.isvalidnode(); + const ::arrow::dataset::types::TreeNode& child = is_valid_node.args(); + std::shared_ptr translatedChild = + translateNode(child, env); + return std::make_shared(translatedChild); + } + std::string error_message = "Unknown node type"; + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + return nullptr; // unreachable +} + +std::shared_ptr translateFilter( + arrow::dataset::types::Condition condition, JNIEnv* env) { + const arrow::dataset::types::TreeNode& tree_node = condition.root(); + return translateNode(tree_node, env); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeDatasetFactory + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDatasetFactory( + JNIEnv*, jobject, jlong id) { + dataset_factory_holder_.Erase(id); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: inspectSchema + * Signature: (J)[B + */ +JNIEXPORT jbyteArray JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_inspectSchema( + JNIEnv* env, jobject, jlong dataset_factor_id) { + std::shared_ptr d = + dataset_factory_holder_.Lookup(dataset_factor_id); + JNI_ASSIGN_OR_THROW(std::shared_ptr schema, d->Inspect()) + return ToSchemaByteArray(env, schema); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: createDataset + * Signature: (J[B)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createDataset( + JNIEnv* env, jobject, jlong dataset_factory_id, jbyteArray schema_bytes) { + std::shared_ptr d = + dataset_factory_holder_.Lookup(dataset_factory_id); + std::shared_ptr schema; + schema = FromSchemaByteArray(env, schema_bytes); + JNI_ASSIGN_OR_THROW(std::shared_ptr dataset, d->Finish(schema)) + return dataset_holder_.Insert(dataset); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeDataset + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset(JNIEnv*, jobject, jlong id) { + dataset_holder_.Erase(id); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: createScanner + * Signature: (J[Ljava/lang/String;[BJ)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( + JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jbyteArray filter, + jlong batch_size) { + std::shared_ptr context = + std::make_shared(); + std::shared_ptr dataset = dataset_holder_.Lookup(dataset_id); + JNI_ASSIGN_OR_THROW(std::shared_ptr scanner_builder, + dataset->NewScan()) + + std::vector column_vector = ToStringVector(env, columns); + JNI_ASSERT_OK_OR_THROW(scanner_builder->Project(column_vector)); + JNI_ASSERT_OK_OR_THROW(scanner_builder->BatchSize(batch_size)); + + // initialize filters + jsize exprs_len = env->GetArrayLength(filter); + jbyte* exprs_bytes = env->GetByteArrayElements(filter, 0); + arrow::dataset::types::Condition condition; + if (!ParseProtobuf(reinterpret_cast(exprs_bytes), exprs_len, &condition)) { + releaseFilterInput(filter, exprs_bytes, env); + std::string error_message = "bad protobuf message"; + env->ThrowNew(illegal_argument_exception_class, error_message.c_str()); + } + if (condition.has_root()) { + JNI_ASSERT_OK_OR_THROW(scanner_builder->Filter(translateFilter(condition, env))); + } + JNI_ASSIGN_OR_THROW(auto scanner, scanner_builder->Finish()) + jlong id = scanner_holder_.Insert(scanner); + releaseFilterInput(filter, exprs_bytes, env); + return id; +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: getSchemaFromScanner + * Signature: (J)[B + */ +JNIEXPORT jbyteArray JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_getSchemaFromScanner(JNIEnv* env, jobject, + jlong scanner_id) { + std::shared_ptr schema = scanner_holder_.Lookup(scanner_id)->schema(); + return ToSchemaByteArray(env, schema); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: getScanTasksFromScanner + * Signature: (J)[J + */ +JNIEXPORT jlongArray JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_getScanTasksFromScanner(JNIEnv* env, jobject, + jlong scanner_id) { + std::shared_ptr scanner = scanner_holder_.Lookup(scanner_id); + JNI_ASSIGN_OR_THROW(arrow::dataset::ScanTaskIterator itr, scanner->Scan()) + std::vector> vector = + collect(env, std::move(itr)); + jlongArray ret = env->NewLongArray(vector.size()); + for (size_t i = 0; i < vector.size(); i++) { + std::shared_ptr scan_task = vector.at(i); + jlong id[] = {scan_task_holder_.Insert(scan_task)}; + env->SetLongArrayRegion(ret, i, 1, id); + } + return ret; +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeScanner + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeScanner( + JNIEnv*, jobject, jlong scanner_id) { + scanner_holder_.Erase(scanner_id); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeScanTask + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_closeScanTask(JNIEnv*, jobject, jlong id) { + scan_task_holder_.Erase(id); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: scan + * Signature: (J)J + */ +JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_scan( + JNIEnv* env, jobject, jlong scan_task_id) { + std::shared_ptr scan_task = + scan_task_holder_.Lookup(scan_task_id); + JNI_ASSIGN_OR_THROW(arrow::RecordBatchIterator record_batch_iterator, + scan_task->Execute()) + return iterator_holder_.Insert(std::make_shared( + std::move(record_batch_iterator))); // move and propagate +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: nextRecordBatch + * Signature: (J)Lorg/apache/arrow/dataset/jni/NativeRecordBatchHandle; + */ +JNIEXPORT jobject JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_nextRecordBatch( + JNIEnv* env, jobject, jlong iterator_id) { + std::shared_ptr itr = iterator_holder_.Lookup(iterator_id); + + JNI_ASSIGN_OR_THROW(std::shared_ptr record_batch, itr->Next()) + if (record_batch == nullptr) { + return nullptr; // stream ended + } + std::shared_ptr schema = record_batch->schema(); + jobjectArray field_array = + env->NewObjectArray(schema->num_fields(), record_batch_handle_field_class, nullptr); + + std::vector> buffers; + for (int i = 0; i < schema->num_fields(); ++i) { + auto column = record_batch->column(i); + auto dataArray = column->data(); + jobject field = env->NewObject(record_batch_handle_field_class, + record_batch_handle_field_constructor, + column->length(), column->null_count()); + env->SetObjectArrayElement(field_array, i, field); + + for (auto& buffer : dataArray->buffers) { + buffers.push_back(buffer); + } + } + + jobjectArray buffer_array = + env->NewObjectArray(buffers.size(), record_batch_handle_buffer_class, nullptr); + + for (size_t j = 0; j < buffers.size(); ++j) { + auto buffer = buffers[j]; + uint8_t* data = nullptr; + int64_t size = 0; + int64_t capacity = 0; + if (buffer != nullptr) { + data = (uint8_t*)buffer->data(); + size = buffer->size(); + capacity = buffer->capacity(); + } + jobject buffer_handle = env->NewObject( + record_batch_handle_buffer_class, record_batch_handle_buffer_constructor, + buffer_holder_.Insert(buffer), data, size, capacity); + env->SetObjectArrayElement(buffer_array, j, buffer_handle); + } + + jobject ret = env->NewObject(record_batch_handle_class, record_batch_handle_constructor, + record_batch->num_rows(), field_array, buffer_array); + return ret; +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: closeIterator + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_closeIterator(JNIEnv*, jobject, jlong id) { + iterator_holder_.Erase(id); +} + +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: releaseBuffer + * Signature: (J)V + */ +JNIEXPORT void JNICALL +Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffer(JNIEnv*, jobject, jlong id) { + buffer_holder_.Erase(id); +} + +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: makeSingleFileDatasetFactory + * Signature: (Ljava/lang/String;II)J + */ +JNIEXPORT jlong JNICALL +Java_org_apache_arrow_dataset_file_JniWrapper_makeSingleFileDatasetFactory( + JNIEnv* env, jobject, jstring path, jint file_format_id, jint file_system_id) { + std::shared_ptr file_format = + GetFileFormat(env, file_format_id); + std::string out_path; + std::shared_ptr fs = + GetFileSystem(env, file_system_id, JStringToCString(env, path), &out_path); + JNI_ASSIGN_OR_THROW( + std::shared_ptr d, + arrow::dataset::SingleFileDatasetFactory::Make(out_path, fs, file_format)) + return dataset_factory_holder_.Insert(d); +} diff --git a/cpp/src/jni/dataset/proto/DTypes.proto b/cpp/src/jni/dataset/proto/DTypes.proto new file mode 100644 index 0000000000000..e5fdf6e9a699b --- /dev/null +++ b/cpp/src/jni/dataset/proto/DTypes.proto @@ -0,0 +1,149 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +syntax = "proto2"; +package arrow.dataset.types; + +option java_package = "org.apache.arrow.dataset"; +option java_outer_classname = "DatasetTypes"; +option optimize_for = SPEED; + +message FieldNode { + // name of the field + optional string name = 1; +} + +message FunctionNode { + optional string functionName = 1; + repeated TreeNode inArgs = 2; +} + +message ComparisonNode { + optional string opName = 1; + optional TreeNode leftArg = 2; + optional TreeNode rightArg = 3; +} + +message AndNode { + optional TreeNode leftArg = 2; + optional TreeNode rightArg = 3; +} + +message OrNode { + optional TreeNode leftArg = 2; + optional TreeNode rightArg = 3; +} + +message NotNode { + optional TreeNode args = 1; +} + +message IsValidNode { + optional TreeNode args = 1; +} + +message NullNode { +} + +message IntNode { + optional int32 value = 1; +} + +message FloatNode { + optional float value = 1; +} + +message DoubleNode { + optional double value = 1; +} + +message BooleanNode { + optional bool value = 1; +} + +message LongNode { + optional int64 value = 1; +} + +message StringNode { + optional bytes value = 1; +} + +message BinaryNode { + optional bytes value = 1; +} + +message DecimalNode { + optional string value = 1; + optional int32 precision = 2; + optional int32 scale = 3; +} + +message InNode { + optional FieldNode field = 1; + optional IntConstants intValues = 2; + optional LongConstants longValues = 3; + optional StringConstants stringValues = 4; + optional BinaryConstants binaryValues = 5; +} + +message IntConstants { + repeated IntNode intValues = 1; +} + +message LongConstants { + repeated LongNode longValues = 1; +} + +message StringConstants { + repeated StringNode stringValues = 1; +} + +message BinaryConstants { + repeated BinaryNode binaryValues = 1; +} + +message TreeNode { + optional FieldNode fieldNode = 1; + optional FunctionNode fnNode = 2; + optional ComparisonNode cpNode = 3; + + // control expressions + optional AndNode andNode = 7; + optional OrNode orNode = 8; + + optional NotNode notNode = 9; + optional IsValidNode isValidNode = 10; + + // literals + optional NullNode nullNode = 11; + optional IntNode intNode = 12; + optional FloatNode floatNode = 13; + optional LongNode longNode = 14; + optional BooleanNode booleanNode = 15; + optional DoubleNode doubleNode = 16; + optional StringNode stringNode = 17; + optional BinaryNode binaryNode = 18; + optional DecimalNode decimalNode = 19; + + // in expr + optional InNode inNode = 21; +} + +message Condition { + optional TreeNode root = 1; +} \ No newline at end of file diff --git a/cpp/src/jni/orc/concurrent_map.h b/cpp/src/jni/orc/concurrent_map.h index b560886628bb2..24f19d65ee7f8 100644 --- a/cpp/src/jni/orc/concurrent_map.h +++ b/cpp/src/jni/orc/concurrent_map.h @@ -17,6 +17,7 @@ #pragma once +#include #include #include #include diff --git a/cpp/src/jni/orc/jni_wrapper.cpp b/cpp/src/jni/orc/jni_wrapper.cpp index ce0546732c19c..ef747c7d24d87 100644 --- a/cpp/src/jni/orc/jni_wrapper.cpp +++ b/cpp/src/jni/orc/jni_wrapper.cpp @@ -31,7 +31,7 @@ #include "org_apache_arrow_adapter_orc_OrcReaderJniWrapper.h" #include "org_apache_arrow_adapter_orc_OrcStripeReaderJniWrapper.h" -#include "./concurrent_map.h" +#include "jni/orc/concurrent_map.h" using ORCFileReader = arrow::adapters::orc::ORCFileReader; using RecordBatchReader = arrow::RecordBatchReader; diff --git a/java/dataset/CMakeLists.txt b/java/dataset/CMakeLists.txt new file mode 100644 index 0000000000000..a0ec921943101 --- /dev/null +++ b/java/dataset/CMakeLists.txt @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# +# arrow_dataset_java +# + +# Headers: top level + +project(arrow_dataset_java) + +# Find java/jni +include(FindJava) +include(UseJava) +include(FindJNI) + +message("generating headers to ${JNI_HEADERS_DIR}") + +add_jar( + arrow_dataset_java + src/main/java/org/apache/arrow/dataset/jni/JniLoader.java + src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java + src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java + src/main/java/org/apache/arrow/dataset/file/JniWrapper.java + GENERATE_NATIVE_HEADERS arrow_dataset_java-native + DESTINATION ${JNI_HEADERS_DIR} +) diff --git a/java/dataset/pom.xml b/java/dataset/pom.xml new file mode 100644 index 0000000000000..a6bc7e9921fa7 --- /dev/null +++ b/java/dataset/pom.xml @@ -0,0 +1,83 @@ + + + + + arrow-java-root + org.apache.arrow + 0.17.0 + + 4.0.0 + + arrow-dataset + Arrow Java Datasets + Java implementation of Arrow Datasets API/Framework + jar + + ../../../cpp/release-build/ + 2.5.0 + + + + + org.apache.arrow + arrow-vector + ${project.version} + compile + ${arrow.vector.classifier} + + + org.apache.arrow + arrow-memory + ${project.version} + compile + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + + + + ${arrow.cpp.build.dir} + + **/libarrow_dataset_jni.* + + + + + + + org.xolstice.maven.plugins + protobuf-maven-plugin + 0.5.1 + + com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier} + + ../../cpp/src/jni/dataset/proto + + + + + compile + test-compile + + + + + + + + \ No newline at end of file diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java new file mode 100644 index 0000000000000..ed5b3d4b184e4 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileFormat.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +/** + * File format definitions. + */ +public enum FileFormat { + PARQUET(0); + + private int id; + + FileFormat(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystem.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystem.java new file mode 100644 index 0000000000000..807d23b1c769e --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystem.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +/** + * Filesystem definitions. + */ +public enum FileSystem { + LOCAL(0), + HDFS(1); + + private int id; + + FileSystem(int id) { + this.id = id; + } + + public int id() { + return id; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java new file mode 100644 index 0000000000000..7a0e1830af7a7 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.JniLoader; + +/** + * JniWrapper for filesystem based {@link org.apache.arrow.dataset.source.Dataset} implementations. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + public native long makeSingleFileDatasetFactory(String path, int fileFormat, int fileSystem); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/SingleFileDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/SingleFileDatasetFactory.java new file mode 100644 index 0000000000000..a339a0623d801 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/SingleFileDatasetFactory.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.file; + +import org.apache.arrow.dataset.jni.NativeDatasetFactory; +import org.apache.arrow.memory.BufferAllocator; + +/** + * Java binding of the C++ SingleFileDataSourceDiscovery. + */ +public class SingleFileDatasetFactory extends NativeDatasetFactory { + + public SingleFileDatasetFactory(BufferAllocator allocator, FileFormat format, FileSystem fs, String path) { + super(allocator, createNative(format, fs, path)); + } + + private static long createNative(FileFormat format, FileSystem fs, String path) { + return JniWrapper.get().makeSingleFileDatasetFactory(path, format.id(), fs.id()); + } + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/filter/Filter.java b/java/dataset/src/main/java/org/apache/arrow/dataset/filter/Filter.java new file mode 100644 index 0000000000000..67f9a8d3198fb --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/filter/Filter.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.filter; + +// todo filter tree implementation +// todo see also https://issues.apache.org/jira/browse/ARROW-6953 + +/** + * Datasets filter. + */ +public interface Filter { + + Filter EMPTY = new Filter() { + @Override + public byte[] toByteArray() { + return new byte[0]; + } + }; + + byte[] toByteArray(); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/filter/FilterImpl.java b/java/dataset/src/main/java/org/apache/arrow/dataset/filter/FilterImpl.java new file mode 100644 index 0000000000000..9749fa2dba69f --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/filter/FilterImpl.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.filter; + +import org.apache.arrow.dataset.DatasetTypes; + +/** + * Provided implementation of {@link Filter}. + * + * @see org.apache.arrow.dataset.DatasetTypes + */ +public class FilterImpl implements Filter { + private final DatasetTypes.Condition condition; + + public FilterImpl(DatasetTypes.Condition condition) { + this.condition = condition; + } + + @Override + public byte[] toByteArray() { + return condition.toByteArray(); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/fragment/DataFragment.java b/java/dataset/src/main/java/org/apache/arrow/dataset/fragment/DataFragment.java new file mode 100644 index 0000000000000..d5582483b23f2 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/fragment/DataFragment.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.fragment; + +import org.apache.arrow.dataset.scanner.ScanTask; + +/** + * A granular piece of a Dataset, such as an individual file, + * which can be read/scanned separately from other fragments. + */ +public interface DataFragment { + Iterable scan(); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java new file mode 100644 index 0000000000000..eb5bee10d8d65 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.StandardCopyOption; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * The JniLoader for Datasets API's native implementation. + */ +public final class JniLoader { + private static final JniLoader INSTANCE = new JniLoader(); + private static final List LIBRARY_NAMES = Collections.singletonList("arrow_dataset_jni"); + + private AtomicBoolean loaded = new AtomicBoolean(false); + + public static JniLoader get() { + return INSTANCE; + } + + private JniLoader() { + } + + /** + * If required JNI libraries are not loaded, then load them. + */ + public void ensureLoaded() { + if (loaded.compareAndSet(false, true)) { + LIBRARY_NAMES.forEach(this::load); + } + } + + private void load(String name) { + final String libraryToLoad = System.mapLibraryName(name); + try { + File temp = File.createTempFile("jnilib-", ".tmp", new File(System.getProperty("java.io.tmpdir"))); + try (final InputStream is + = JniWrapper.class.getClassLoader().getResourceAsStream(libraryToLoad)) { + if (is == null) { + throw new FileNotFoundException(libraryToLoad); + } + Files.copy(is, temp.toPath(), StandardCopyOption.REPLACE_EXISTING); + System.load(temp.getAbsolutePath()); + } + } catch (IOException e) { + throw new IllegalStateException("error loading native libraries: " + e); + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java new file mode 100644 index 0000000000000..66a5f8f9f8dde --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +/** + * JNI wrapper for Datasets API's native implementation. + */ +public class JniWrapper { + + private static final JniWrapper INSTANCE = new JniWrapper(); + + public static JniWrapper get() { + return INSTANCE; + } + + private JniWrapper() { + JniLoader.get().ensureLoaded(); + } + + public native void closeDatasetFactory(long datasetFactoryId); + + public native byte[] inspectSchema(long datasetFactoryId); + + public native long createDataset(long datasetFactoryId, byte[] schema); + + public native void closeDataset(long datasetId); + + public native long createScanner(long datasetId, String[] columns, byte[] filter, long batchSize); + + public native byte[] getSchemaFromScanner(long scannerId); + + public native long[] getScanTasksFromScanner(long scannerId); + + public native void closeScanner(long scannerId); + + public native void closeScanTask(long scanTaskId); + + public native long scan(long scanTaskId); + + public native NativeRecordBatchHandle nextRecordBatch(long recordBatchIteratorId); + + public native void closeIterator(long id); + + public native void releaseBuffer(long bufferId); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java new file mode 100644 index 0000000000000..7e893e4e93ce8 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeContext.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.Preconditions; + +/** + * Context for relevant classes of NativeDataSource. + */ +public class NativeContext { + private final BufferAllocator allocator; + + /** + * Constructor. + * + * @param allocator The allocator in use. + */ + public NativeContext(BufferAllocator allocator) { + Preconditions.checkArgument(allocator instanceof BaseAllocator, + "currently only instance of BaseAllocator supported"); + this.allocator = allocator; + } + + /** + * Returns the allocator which is in use. + */ + public BaseAllocator getAllocator() { + return (BaseAllocator) allocator; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java new file mode 100644 index 0000000000000..b89ef89e04ce7 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import org.apache.arrow.dataset.fragment.DataFragment; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.source.Dataset; + +/** + * Native implementation of {@link Dataset}. + */ +public class NativeDataset implements Dataset, AutoCloseable { + + private final NativeContext context; + private final long datasetId; + + public NativeDataset(NativeContext context, long datasetId) { + this.context = context; + this.datasetId = datasetId; + } + + @Override + public Iterable getFragments(ScanOptions options) { + throw new UnsupportedOperationException("Use of getFragments() on NativeDataset is currently forbidden. " + + "Try creating scanners instead"); + } + + @Override + public NativeScanner newScan(ScanOptions options) { + long scannerId = JniWrapper.get().createScanner(datasetId, options.getColumns(), + options.getFilter().toByteArray(), options.getBatchSize()); + return new NativeScanner(context, scannerId); + } + + public long getDatasetId() { + return datasetId; + } + + @Override + public void close() { + JniWrapper.get().closeDataset(datasetId); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java new file mode 100644 index 0000000000000..0ba5af4ad0109 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDatasetFactory.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; + +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.util.SchemaUtils; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Native implementation of {@link DatasetFactory}. + */ +public class NativeDatasetFactory implements DatasetFactory, AutoCloseable { + private final long dataSourceDiscoveryId; + private final BufferAllocator allocator; + + public NativeDatasetFactory(BufferAllocator allocator, long dataSourceDiscoveryId) { + this.allocator = allocator; + this.dataSourceDiscoveryId = dataSourceDiscoveryId; + } + + @Override + public Schema inspect() { + byte[] buffer = JniWrapper.get().inspectSchema(dataSourceDiscoveryId); + try { + return SchemaUtils.get().deserialize(buffer, allocator); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public NativeDataset finish() { + return finish(inspect()); + } + + @Override + public NativeDataset finish(Schema schema) { + try { + byte[] serialized = SchemaUtils.get().serialize(schema); + return new NativeDataset(new NativeContext(allocator), + JniWrapper.get().createDataset(dataSourceDiscoveryId, serialized)); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + JniWrapper.get().closeDatasetFactory(dataSourceDiscoveryId); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java new file mode 100644 index 0000000000000..dd90fd1c1ddb7 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeRecordBatchHandle.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.util.Arrays; +import java.util.List; + +/** + * Hold pointers to a Arrow C++ RecordBatch. + */ +public class NativeRecordBatchHandle { + + private final long numRows; + private final List fields; + private final List buffers; + + /** + * Constructor. + * + * @param numRows Total row number of the associated RecordBatch + * @param fields Metadata of fields + * @param buffers Retained Arrow buffers + */ + public NativeRecordBatchHandle(long numRows, Field[] fields, Buffer[] buffers) { + this.numRows = numRows; + this.fields = Arrays.asList(fields); + this.buffers = Arrays.asList(buffers); + } + + /** + * Returns the total row number of the associated RecordBatch. + * @return Total row number of the associated RecordBatch. + */ + public long getNumRows() { + return numRows; + } + + /** + * Returns Metadata of fields. + * @return Metadata of fields. + */ + public List getFields() { + return fields; + } + + /** + * Returns the buffers. + * @return Retained Arrow buffers. + */ + public List getBuffers() { + return buffers; + } + + /** + * Field metadata. + */ + public static class Field { + public final long length; + public final long nullCount; + + public Field(long length, long nullCount) { + this.length = length; + this.nullCount = nullCount; + } + } + + /** + * Pointers and metadata of the targeted Arrow buffer. + */ + public static class Buffer { + public final long nativeInstanceId; + public final long memoryAddress; + public final long size; + public final long capacity; + + /** + * Constructor. + * + * @param nativeInstanceId Native instance's id + * @param memoryAddress Memory address of the first byte + * @param size Size (in bytes) + * @param capacity Capacity (in bytes) + */ + public Buffer(long nativeInstanceId, long memoryAddress, long size, long capacity) { + this.nativeInstanceId = nativeInstanceId; + this.memoryAddress = memoryAddress; + this.size = size; + this.capacity = capacity; + } + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java new file mode 100644 index 0000000000000..17f7464e4777a --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanTask.java @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.NoSuchElementException; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.memory.BaseAllocator; +import org.apache.arrow.memory.BufferLedger; +import org.apache.arrow.memory.NativeUnderlingMemory; +import org.apache.arrow.memory.Ownerships; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.ipc.message.ArrowFieldNode; +import org.apache.arrow.vector.ipc.message.ArrowRecordBatch; +import org.apache.arrow.vector.types.pojo.Schema; + +import io.netty.buffer.ArrowBuf; + +/** + * Native implementation of {@link NativeScanTask}. + */ +public class NativeScanTask implements ScanTask, AutoCloseable { + private final NativeContext context; + private final Schema schema; + private final long scanTaskId; + + /** + * Constructor. + */ + public NativeScanTask(NativeContext context, Schema schema, long scanTaskId) { + this.context = context; + this.schema = schema; + this.scanTaskId = scanTaskId; + } + + @Override + public Itr scan() { + + return new Itr() { + + private final Reader in = new Reader(JniWrapper.get().scan(scanTaskId)); + private VectorSchemaRoot peek = null; + + @Override + public void close() throws Exception { + in.close(); + } + + @Override + public boolean hasNext() { + try { + if (peek != null) { + return true; + } + if (!in.loadNextBatch()) { + return false; + } + peek = in.getVectorSchemaRoot(); + return true; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public VectorSchemaRoot next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + try { + return peek; + } finally { + peek = null; + } + } + }; + } + + @Override + public void close() { + JniWrapper.get().closeScanTask(scanTaskId); + } + + private class Reader extends ArrowReader { + + private final long recordBatchIteratorId; + + Reader(long recordBatchIteratorId) { + super(context.getAllocator()); + this.recordBatchIteratorId = recordBatchIteratorId; + } + + @Override + public boolean loadNextBatch() throws IOException { + // fixme it seems that the initialization is not thread-safe. Does caller already make it safe? + ensureInitialized(); + NativeRecordBatchHandle handle = JniWrapper.get().nextRecordBatch(recordBatchIteratorId); + if (handle == null) { + return false; + } + final ArrayList buffers = new ArrayList<>(); + for (NativeRecordBatchHandle.Buffer buffer : handle.getBuffers()) { + final BaseAllocator allocator = context.getAllocator(); + final NativeUnderlingMemory am = new NativeUnderlingMemory(allocator, + (int) buffer.size, buffer.nativeInstanceId, buffer.memoryAddress); + final BufferLedger ledger = Ownerships.get().takeOwnership(allocator, am); + ArrowBuf buf = new ArrowBuf(ledger, null, (int) buffer.size, buffer.memoryAddress, false); + buffers.add(buf); + } + try { + loadRecordBatch( + new ArrowRecordBatch((int) handle.getNumRows(), handle.getFields().stream() + .map(field -> new ArrowFieldNode((int) field.length, (int) field.nullCount)) + .collect(Collectors.toList()), buffers)); + } finally { + buffers.forEach(b -> b.getReferenceManager().release()); + } + return true; + } + + @Override + public long bytesRead() { + throw new UnsupportedOperationException(); + } + + @Override + protected void closeReadSource() throws IOException { + JniWrapper.get().closeIterator(recordBatchIteratorId); + } + + @Override + protected Schema readSchema() throws IOException { + return schema; + } + + + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java new file mode 100644 index 0000000000000..c44fd85583b45 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeScanner.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.IOException; +import java.util.Arrays; +import java.util.stream.Collectors; + +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.util.SchemaUtils; +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * Native implementation of {@link Scanner}. + */ +public class NativeScanner implements Scanner, AutoCloseable { + + private final NativeContext context; + private final long scannerId; + + public NativeScanner(NativeContext context, long scannerId) { + this.context = context; + this.scannerId = scannerId; + } + + @Override + public Iterable scan() { + return Arrays.stream(JniWrapper.get().getScanTasksFromScanner(scannerId)) + .mapToObj(id -> new NativeScanTask(context, schema(), id)) + .collect(Collectors.toList()); + } + + @Override + public Schema schema() { + try { + return SchemaUtils.get().deserialize(JniWrapper.get().getSchemaFromScanner(scannerId), context.getAllocator()); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + JniWrapper.get().closeScanner(scannerId); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ProjectAndFilterScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ProjectAndFilterScanTask.java new file mode 100644 index 0000000000000..4761a5932f4c7 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ProjectAndFilterScanTask.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.dataset.filter.Filter; + +/** + * Decorating an instance of {@link ScanTask} by having filters and projectors processed individually after scanning. + */ +public class ProjectAndFilterScanTask implements ScanTask { + private final ScanTask delegate; + private final String[] columns; + private final Filter filter; + + /** + * Constructor. + * + * @param delegate Delegate {@link ScanTask} instance + * @param columns Projected columns + * @param filter Filter + */ + public ProjectAndFilterScanTask(ScanTask delegate, String[] columns, Filter filter) { + this.delegate = delegate; + this.columns = columns; + this.filter = filter; + } + + @Override + public Itr scan() { + // TODO UNIMPLEMENTED + return delegate.scan(); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java new file mode 100644 index 0000000000000..b254b55889e13 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.dataset.filter.Filter; + +/** + * Options used during scanning. + */ +public class ScanOptions { + private final String[] columns; + private final Filter filter; + private final long batchSize; + + /** + * Constructor. + * @param columns Projected columns + * @param filter Filter + * @param batchSize Maximum row number of each returned {@link org.apache.arrow.vector.VectorSchemaRoot} + */ + public ScanOptions(String[] columns, Filter filter, long batchSize) { + this.columns = columns; + this.filter = filter; + this.batchSize = batchSize; + } + + public String[] getColumns() { + return columns; + } + + public Filter getFilter() { + return filter; + } + + public long getBatchSize() { + return batchSize; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java new file mode 100644 index 0000000000000..6167f8c7cc6ec --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanTask.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import java.util.Iterator; + +import org.apache.arrow.vector.VectorSchemaRoot; + +/** + * Read record batches from a range of a single data fragment. A + * ScanTask is meant to be a unit of work to be dispatched. The implementation + * must be thread and concurrent safe. + */ +public interface ScanTask { + + /** + * Creates and returns a {@link Itr} instance. + */ + Itr scan(); + + /** + * The iterator implementation for {@link VectorSchemaRoot}s. + */ + interface Itr extends Iterator, AutoCloseable { + // FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead. + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java new file mode 100644 index 0000000000000..bd0db2c10f06c --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/Scanner.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.scanner; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * A high level interface for scanning data over datasets. + */ +public interface Scanner { + + Iterable scan(); + + Schema schema(); + +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java new file mode 100644 index 0000000000000..eb97b8c033f6e --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/source/Dataset.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.dataset.fragment.DataFragment; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; + +/** + * A container of Fragments which are the internal iterable unit of read data. + */ +public interface Dataset { + + Scanner newScan(ScanOptions options); + + Iterable getFragments(ScanOptions options); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java new file mode 100644 index 0000000000000..ef30bb9d1cdaa --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/source/DatasetFactory.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.source; + +import org.apache.arrow.vector.types.pojo.Schema; + +/** + * DataSourceDiscovery provides a way to inspect a DataSource potential + * schema before materializing it. Thus, the user can peek the schema for + * data sources and decide on a unified schema. + */ +public interface DatasetFactory { + + Schema inspect(); + + Dataset finish(); + + Dataset finish(Schema schema); +} diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlingMemory.java b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlingMemory.java new file mode 100644 index 0000000000000..bed8a061a6a61 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/memory/NativeUnderlingMemory.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +import org.apache.arrow.dataset.jni.JniWrapper; + +/** + * AllocationManager implementation for Native allocated memory. + */ +public class NativeUnderlingMemory extends AllocationManager { + + private final int size; + private final long nativeInstanceId; + private final long address; + + /** + * Constructor. + * + * @param accountingAllocator The accounting allocator instance + * @param size Size of underling memory (in bytes) + * @param nativeInstanceId ID of the native instance + */ + public NativeUnderlingMemory(BaseAllocator accountingAllocator, int size, long nativeInstanceId, long address) { + super(accountingAllocator); + this.size = size; + this.nativeInstanceId = nativeInstanceId; + this.address = address; + } + + @Override + public BufferLedger associate(BaseAllocator allocator) { + return super.associate(allocator); + } + + @Override + protected void release0() { + JniWrapper.get().releaseBuffer(nativeInstanceId); + } + + @Override + public long getSize() { + return size; + } + + @Override + protected long memoryAddress() { + return address; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/memory/Ownerships.java b/java/dataset/src/main/java/org/apache/arrow/memory/Ownerships.java new file mode 100644 index 0000000000000..eee22edea63fd --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/memory/Ownerships.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.memory; + +/** + * Utility class managing ownership's transferring between Native Arrow buffers and Java Arrow buffers. + */ +public class Ownerships { + private static final Ownerships INSTANCE = new Ownerships(); + + private Ownerships() { + } + + public static Ownerships get() { + return INSTANCE; + } + + /** + * Returned ledger's ref count is initialized or increased by 1. + */ + public BufferLedger takeOwnership(BaseAllocator allocator, AllocationManager allocationManager) { + final BufferLedger ledger = allocationManager.associate(allocator); + boolean succeed = allocator.forceAllocate(allocationManager.getSize()); + if (!succeed) { + throw new OutOfMemoryException("Target allocator is full"); + } + allocationManager.setOwningLedger(ledger); + return ledger; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/util/SchemaUtils.java b/java/dataset/src/main/java/org/apache/arrow/util/SchemaUtils.java new file mode 100644 index 0000000000000..e3c8ef8fca1b8 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/util/SchemaUtils.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.util; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.channels.Channels; + +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.vector.ipc.ReadChannel; +import org.apache.arrow.vector.ipc.WriteChannel; +import org.apache.arrow.vector.ipc.message.MessageChannelReader; +import org.apache.arrow.vector.ipc.message.MessageResult; +import org.apache.arrow.vector.ipc.message.MessageSerializer; +import org.apache.arrow.vector.types.pojo.Schema; +import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel; + +/** + * Schema utility class including serialization and deserialization. + */ +public class SchemaUtils { + private static final SchemaUtils INSTANCE = new SchemaUtils(); + + public static SchemaUtils get() { + return INSTANCE; + } + + private SchemaUtils() { + + } + + /** + * Deserialize Arrow schema from byte array. + */ + public Schema deserialize(byte[] bytes, BufferAllocator allocator) throws IOException { + try (MessageChannelReader schemaReader = + new MessageChannelReader( + new ReadChannel( + new ByteArrayReadableSeekableByteChannel(bytes)), allocator)) { + + MessageResult result = schemaReader.readNext(); + if (result == null) { + throw new IOException("Unexpected end of input. Missing schema."); + } + return MessageSerializer.deserializeSchema(result.getMessage()); + } + } + + /** + * Serialize Arrow schema into byte array. + */ + public byte[] serialize(Schema schema) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + MessageSerializer.serialize(new WriteChannel(Channels.newChannel(out)), schema); + return out.toByteArray(); + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java new file mode 100644 index 0000000000000..880901f2f9b2d --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/jni/NativeDatasetTest.java @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.arrow.dataset.jni; + +import java.io.File; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterator; +import java.util.Spliterators; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import org.apache.arrow.dataset.DatasetTypes; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystem; +import org.apache.arrow.dataset.file.SingleFileDatasetFactory; +import org.apache.arrow.dataset.filter.Filter; +import org.apache.arrow.dataset.filter.FilterImpl; +import org.apache.arrow.dataset.fragment.DataFragment; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.ScanTask; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.VectorSchemaRoot; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.Assert; +import org.junit.Ignore; +import org.junit.Test; + +@Ignore +public class NativeDatasetTest { + + private String sampleParquet() { + return NativeDatasetTest.class.getResource(File.separator + "userdata1.parquet").getPath(); + } + + private void testDatasetFactoryEndToEnd(DatasetFactory factory) { + Schema schema = factory.inspect(); + + Assert.assertEquals("Schema", + schema.toString()); + + Dataset dataset = factory.finish(); + Assert.assertNotNull(dataset); + + List fragments = collect( + dataset.getFragments(new ScanOptions(new String[0], Filter.EMPTY, 100))); + Assert.assertEquals(1, fragments.size()); + + DataFragment fragment = fragments.get(0); + List scanTasks = collect(fragment.scan()); + Assert.assertEquals(1, scanTasks.size()); + + ScanTask scanTask = scanTasks.get(0); + List data = collect(scanTask.scan()); + Assert.assertNotNull(data); + // 1000 rows total in file userdata1.parquet + Assert.assertEquals(10, data.size()); + VectorSchemaRoot vsr = data.get(0); + Assert.assertEquals(100, vsr.getRowCount()); + + + // FIXME when using list field: + // FIXME it seems c++ parquet reader doesn't create buffers for list field it self, + // FIXME as a result Java side buffer pointer gets out of bound. + } + + @Ignore + public void testLocalFs() { + String path = sampleParquet(); + DatasetFactory discovery = new SingleFileDatasetFactory( + new RootAllocator(Long.MAX_VALUE), FileFormat.PARQUET, FileSystem.LOCAL, + path); + testDatasetFactoryEndToEnd(discovery); + } + + @Ignore + public void testHdfsWithFileProtocol() { + String path = "file:" + sampleParquet(); + DatasetFactory discovery = new SingleFileDatasetFactory( + new RootAllocator(Long.MAX_VALUE), FileFormat.PARQUET, FileSystem.HDFS, + path); + testDatasetFactoryEndToEnd(discovery); + } + + @Ignore + public void testHdfsWithHdfsProtocol() { + // If using libhdfs rather than libhdfs3: + // Set JAVA_HOME and HADOOP_HOME first. See hdfs_internal.cc:128 + // And libhdfs requires having hadoop java libraries set within CLASSPATH. See 1. https://hadoop.apache.org/docs/r2.7.7/hadoop-project-dist/hadoop-hdfs/LibHdfs.html, 2. https://arrow.apache.org/docs/python/filesystems.html#hadoop-file-system-hdfs + + // If using libhdfs3, make sure ARROW_LIBHDFS3_DIR is set. + // Install libhdfs3: https://medium.com/@arush.xtremelife/connecting-hadoop-hdfs-with-python-267234bb68a2 + String path = "hdfs://localhost:9000/userdata1.parquet?use_hdfs3=1"; + DatasetFactory discovery = new SingleFileDatasetFactory( + new RootAllocator(Long.MAX_VALUE), FileFormat.PARQUET, FileSystem.HDFS, + path); + testDatasetFactoryEndToEnd(discovery); + } + + @Test + public void testScanner() { + String path = sampleParquet(); + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + NativeDatasetFactory factory = new SingleFileDatasetFactory( + allocator, FileFormat.PARQUET, FileSystem.LOCAL, + path); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + ScanOptions scanOptions = new ScanOptions(new String[]{"id", "title"}, Filter.EMPTY, 100); + Scanner scanner = dataset.newScan(scanOptions); + List scanTasks = collect(scanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + + ScanTask scanTask = scanTasks.get(0); + ScanTask.Itr itr = scanTask.scan(); + int vsrCount = 0; + VectorSchemaRoot vsr = null; + while (itr.hasNext()) { + // FIXME VectorSchemaRoot is not actually something ITERABLE.// Using a reader convention instead. + vsrCount++; + vsr = itr.next(); + Assert.assertEquals(100, vsr.getRowCount()); + + // check if projector is applied + Assert.assertEquals("Schema", + vsr.getSchema().toString()); + } + Assert.assertEquals(10, vsrCount); + + if (vsr != null) { + vsr.close(); + } + allocator.close(); + } + + @Test + public void testScannerWithFilter() { + String path = sampleParquet(); + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + NativeDatasetFactory factory = new SingleFileDatasetFactory( + allocator, FileFormat.PARQUET, FileSystem.LOCAL, + path); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + // condition id = 500 + DatasetTypes.Condition condition = DatasetTypes.Condition.newBuilder() + .setRoot(DatasetTypes.TreeNode.newBuilder() + .setCpNode(DatasetTypes.ComparisonNode.newBuilder() + .setOpName("equal") // todo make op names enumerable + .setLeftArg( + DatasetTypes.TreeNode.newBuilder().setFieldNode( + DatasetTypes.FieldNode.newBuilder().setName("id").build()).build()) + .setRightArg( + DatasetTypes.TreeNode.newBuilder().setIntNode( + DatasetTypes.IntNode.newBuilder().setValue(500).build()).build()) + .build()) + .build()) + .build(); + Filter filter = new FilterImpl(condition); + ScanOptions scanOptions = new ScanOptions(new String[]{"id", "title"}, filter, 100); + Scanner scanner = dataset.newScan(scanOptions); + List scanTasks = collect(scanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + + ScanTask scanTask = scanTasks.get(0); + ScanTask.Itr itr = scanTask.scan(); + VectorSchemaRoot vsr = null; + int rowCount = 0; + while (itr.hasNext()) { + // FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead. + vsr = itr.next(); + // only the line with id = 500 selected + rowCount += vsr.getRowCount(); + + // check if projector is applied + Assert.assertEquals("Schema", + vsr.getSchema().toString()); + } + Assert.assertEquals(1, rowCount); + + if (vsr != null) { + vsr.close(); + } + allocator.close(); + } + + // TODO fix for empty projector. Currently empty projector is treated as projection on all available columns. + @Ignore + public void testScannerWithEmptyProjector() { + String path = sampleParquet(); + RootAllocator allocator = new RootAllocator(Long.MAX_VALUE); + NativeDatasetFactory factory = new SingleFileDatasetFactory( + allocator, FileFormat.PARQUET, FileSystem.LOCAL, + path); + Schema schema = factory.inspect(); + NativeDataset dataset = factory.finish(schema); + ScanOptions scanOptions = new ScanOptions(new String[]{}, Filter.EMPTY, 100); + Scanner scanner = dataset.newScan(scanOptions); + List scanTasks = collect(scanner.scan()); + Assert.assertEquals(1, scanTasks.size()); + + ScanTask scanTask = scanTasks.get(0); + ScanTask.Itr itr = scanTask.scan(); + VectorSchemaRoot vsr = null; + int rowCount = 0; + while (itr.hasNext()) { + // FIXME VectorSchemaRoot is not actually something ITERABLE. Using a reader convention instead. + vsr = itr.next(); + rowCount += vsr.getRowCount(); + + // check if projector is applied + Assert.assertEquals("Schema<>", + vsr.getSchema().toString()); + } + Assert.assertEquals(1, rowCount); + + if (vsr != null) { + vsr.close(); + } + allocator.close(); + } + + @Ignore + public void testFilter() { + // todo + } + + @Ignore + public void testProjector() { + // todo + } + + private List collect(Iterable iterable) { + return StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toList()); + } + + private List collect(Iterator iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false) + .collect(Collectors.toList()); + } +} diff --git a/java/dataset/src/test/resources/userdata1.parquet b/java/dataset/src/test/resources/userdata1.parquet new file mode 100644 index 0000000000000..2ae23dac0ffe7 Binary files /dev/null and b/java/dataset/src/test/resources/userdata1.parquet differ diff --git a/java/dataset/src/test/resources/users.parquet b/java/dataset/src/test/resources/users.parquet new file mode 100644 index 0000000000000..aa527338c43a8 Binary files /dev/null and b/java/dataset/src/test/resources/users.parquet differ diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/InNode.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/InNode.java index 35dbcb5bb56b5..067e04b6215e4 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/InNode.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/InNode.java @@ -37,6 +37,7 @@ public class InNode implements TreeNode { private final Set stringValues; private final Set binaryValues; private final Field field; + private final TreeNode input; private InNode(Set values, Set longValues, Set stringValues, Set binaryValues, Field field) { @@ -45,31 +46,56 @@ private InNode(Set values, Set longValues, Set stringValu this.stringValues = stringValues; this.binaryValues = binaryValues; this.field = field; + this.input = TreeBuilder.makeField(field); + } + + private InNode(Set values, Set longValues, Set stringValues, Set + binaryValues, TreeNode node) { + this.intValues = values; + this.longValues = longValues; + this.stringValues = stringValues; + this.binaryValues = binaryValues; + this.field = null; + this.input = node; } public static InNode makeIntInExpr(Field field, Set intValues) { - return new InNode(intValues, null, null, null ,field); + return new InNode(intValues, null, null, null, field); + } + + public static InNode makeIntInExpr(TreeNode node, Set intValues) { + return new InNode(intValues, null, null, null, node); } public static InNode makeLongInExpr(Field field, Set longValues) { - return new InNode(null, longValues, null, null ,field); + return new InNode(null, longValues, null, null, field); + } + + public static InNode makeLongInExpr(TreeNode node, Set longValues) { + return new InNode(null, longValues, null, null, node); } public static InNode makeStringInExpr(Field field, Set stringValues) { - return new InNode(null, null, stringValues, null ,field); + return new InNode(null, null, stringValues, null, field); + } + + public static InNode makeStringInExpr(TreeNode node, Set stringValues) { + return new InNode(null, null, stringValues, null, node); } public static InNode makeBinaryInExpr(Field field, Set binaryValues) { - return new InNode(null, null, null, binaryValues ,field); + return new InNode(null, null, null, binaryValues, field); + } + + public static InNode makeBinaryInExpr(TreeNode node, Set binaryValues) { + return new InNode(null, null, null, binaryValues, node); } @Override public GandivaTypes.TreeNode toProtobuf() throws GandivaException { GandivaTypes.InNode.Builder inNode = GandivaTypes.InNode.newBuilder(); - GandivaTypes.FieldNode.Builder fieldNode = GandivaTypes.FieldNode.newBuilder(); - fieldNode.setField(ArrowTypeHelper.arrowFieldToProtobuf(field)); - inNode.setField(fieldNode); + inNode.setNode(input.toProtobuf()); if (intValues != null) { GandivaTypes.IntConstants.Builder intConstants = GandivaTypes.IntConstants.newBuilder(); diff --git a/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/TreeBuilder.java b/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/TreeBuilder.java index c20795fca7b81..3a01582a35101 100644 --- a/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/TreeBuilder.java +++ b/java/gandiva/src/main/java/org/apache/arrow/gandiva/expression/TreeBuilder.java @@ -197,18 +197,38 @@ public static TreeNode makeInExpressionInt32(Field resultField, return InNode.makeIntInExpr(resultField, intValues); } + public static TreeNode makeInExpressionInt32(TreeNode node, + Set intValues) { + return InNode.makeIntInExpr(node, intValues); + } + public static TreeNode makeInExpressionBigInt(Field resultField, Set longValues) { return InNode.makeLongInExpr(resultField, longValues); } + public static TreeNode makeInExpressionBigInt(TreeNode node, + Set longValues) { + return InNode.makeLongInExpr(node, longValues); + } + public static TreeNode makeInExpressionString(Field resultField, Set stringValues) { return InNode.makeStringInExpr(resultField, stringValues); } + public static TreeNode makeInExpressionString(TreeNode resultNode, + Set stringValues) { + return InNode.makeStringInExpr(resultNode, stringValues); + } + public static TreeNode makeInExpressionBinary(Field resultField, Set binaryValues) { return InNode.makeBinaryInExpr(resultField, binaryValues); } + + public static TreeNode makeInExpressionBinary(TreeNode node, + Set binaryValues) { + return InNode.makeBinaryInExpr(node, binaryValues); + } } diff --git a/java/pom.xml b/java/pom.xml index be76d844db6c9..d57006daa32e8 100644 --- a/java/pom.xml +++ b/java/pom.xml @@ -705,6 +705,7 @@ arrow-jni + dataset adapter/orc gandiva diff --git a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java index f483c7db94e2b..7b3c01380a417 100644 --- a/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java +++ b/java/vector/src/main/java/org/apache/arrow/vector/VectorLoader.java @@ -53,12 +53,12 @@ public VectorLoader(VectorSchemaRoot root) { * @param recordBatch the batch to load */ public void load(ArrowRecordBatch recordBatch) { + root.setRowCount(recordBatch.getLength()); Iterator buffers = recordBatch.getBuffers().iterator(); Iterator nodes = recordBatch.getNodes().iterator(); for (FieldVector fieldVector : root.getFieldVectors()) { loadBuffers(fieldVector, fieldVector.getField(), buffers, nodes); } - root.setRowCount(recordBatch.getLength()); if (nodes.hasNext() || buffers.hasNext()) { throw new IllegalArgumentException("not all nodes and buffers were consumed. nodes: " + Collections2.toList(nodes).toString() + " buffers: " + Collections2.toList(buffers).toString());