Skip to content

Commit

Permalink
Merge pull request apache#50 from Intel-bigdata/native-sql-engine-clean
Browse files Browse the repository at this point in the history
Mege native sql engine
  • Loading branch information
zhouyuan authored May 27, 2020
2 parents 401d283 + 03c502e commit fe10d2a
Show file tree
Hide file tree
Showing 60 changed files with 2,929 additions and 27 deletions.
3 changes: 2 additions & 1 deletion ci/docker/linux-apt-jni.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ RUN wget -nv -O - https://github.com/Kitware/CMake/releases/download/v${cmake}/c
ENV PATH=/opt/cmake-${cmake}-Linux-x86_64/bin:$PATH

ENV ARROW_BUILD_TESTS=OFF \
ARROW_DATASET=ON \
ARROW_FLIGHT=OFF \
ARROW_GANDIVA_JAVA=ON \
ARROW_GANDIVA=ON \
ARROW_HOME=/usr/local \
ARROW_JNI=ON \
ARROW_ORC=ON \
ARROW_PARQUET=OFF \
ARROW_PARQUET=ON \
ARROW_PLASMA_JAVA_CLIENT=ON \
ARROW_PLASMA=ON \
ARROW_USE_CCACHE=ON \
Expand Down
4 changes: 2 additions & 2 deletions ci/scripts/java_test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ pushd ${source_dir}

${mvn} test

if [ "${ARROW_GANDIVA_JAVA}" = "ON" ]; then
${mvn} test -Parrow-jni -pl gandiva -Darrow.cpp.build.dir=${cpp_build_dir}
if [ "${ARROW_JNI}" = "ON" ]; then
${mvn} test -Parrow-jni -pl gandiva,dataset -Darrow.cpp.build.dir=${cpp_build_dir}
fi

if [ "${ARROW_PLASMA}" = "ON" ]; then
Expand Down
2 changes: 2 additions & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ cmake_install.cmake
build/
*-build/
Testing/
cmake-build-debug/
cmake-build-release/

#########################################
# Editor temporary/working/backup files #
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/dataset/discovery.cc
Original file line number Diff line number Diff line change
Expand Up @@ -274,5 +274,28 @@ Result<std::shared_ptr<Dataset>> FileSystemDatasetFactory::Finish(FinishOptions
std::move(partitions));
}

Result<std::shared_ptr<DatasetFactory>> SingleFileDatasetFactory::Make(
std::string path, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format) {
std::shared_ptr<FileSource> file_src = std::make_shared<FileSource>(path, fs.get());
return std::shared_ptr<DatasetFactory>(new SingleFileDatasetFactory(
std::move(file_src), std::move(fs), std::move(format)));
}

Result<std::vector<std::shared_ptr<Schema>>> SingleFileDatasetFactory::InspectSchemas(
InspectOptions options) {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Schema> schema, format_->Inspect(*file_))
return std::vector<std::shared_ptr<Schema>>{std::move(schema)};
}

Result<std::shared_ptr<Dataset>> SingleFileDatasetFactory::Finish(FinishOptions options) {
return SingleFileDataset::Make(options.schema, scalar(true), file_, fs_, format_);
}

SingleFileDatasetFactory::SingleFileDatasetFactory(std::shared_ptr<FileSource> file,
std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format)
: file_(std::move(file)), fs_(std::move(fs)), format_(std::move(format)) {}

} // namespace dataset
} // namespace arrow
22 changes: 22 additions & 0 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,5 +232,27 @@ class ARROW_DS_EXPORT FileSystemDatasetFactory : public DatasetFactory {
FileSystemFactoryOptions options_;
};

class ARROW_DS_EXPORT SingleFileDatasetFactory : public DatasetFactory {
public:
static Result<std::shared_ptr<DatasetFactory>> Make(std::string path,
std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format);

Result<std::vector<std::shared_ptr<Schema>>> InspectSchemas(
InspectOptions options) override;

Result<std::shared_ptr<Dataset>> Finish(FinishOptions options) override;

protected:
SingleFileDatasetFactory(std::shared_ptr<FileSource> file,
std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format);

private:
std::shared_ptr<FileSource> file_;
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
};

} // namespace dataset
} // namespace arrow
46 changes: 46 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "arrow/dataset/file_base.h"

#include <algorithm>
#include <memory>
#include <utility>
#include <vector>

#include "arrow/dataset/dataset_internal.h"
Expand Down Expand Up @@ -277,5 +279,49 @@ Status WriteTask::CreateDestinationParentDir() const {
return Status::OK();
}

Result<std::shared_ptr<SingleFileDataset>> SingleFileDataset::Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::string path, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format) {
std::shared_ptr<FileSource> file = std::make_shared<FileSource>(path, fs.get());
return std::make_shared<SingleFileDataset>(schema, root_partition, file, fs, format);
}

Result<std::shared_ptr<SingleFileDataset>> SingleFileDataset::Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileSource> file, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format) {
return std::make_shared<SingleFileDataset>(schema, root_partition, file, fs, format);
}

SingleFileDataset::SingleFileDataset(std::shared_ptr<Schema> schema,
std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileSource> file,
std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format)
: Dataset(std::move(schema), std::move(root_partition)),
file_(std::move(file)),
fs_(std::move(fs)),
format_(std::move(format)) {}

FragmentIterator SingleFileDataset::GetFragmentsImpl(
std::shared_ptr<ScanOptions> options) {
std::vector<std::shared_ptr<FileSource>> files({file_});
auto file_srcs_it = MakeVectorIterator(std::move(files));
auto file_src_to_fragment = [options, this](const std::shared_ptr<FileSource>& source)
-> Result<std::shared_ptr<Fragment>> {
ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Fragment> fragment,
format_->MakeFragment(*source, options))
return fragment;
};
return MakeMaybeMapIterator(file_src_to_fragment, std::move(file_srcs_it));
}

Result<std::shared_ptr<Dataset>> SingleFileDataset::ReplaceSchema(
std::shared_ptr<Schema> schema) const {
RETURN_NOT_OK(CheckProjectable(*schema_, *schema));
return std::shared_ptr<Dataset>(new SingleFileDataset(
std::move(schema), partition_expression_, file_, fs_, format_));
}
} // namespace dataset
} // namespace arrow
30 changes: 30 additions & 0 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,5 +316,35 @@ class ARROW_DS_EXPORT WritePlan {
std::vector<std::string> paths;
};

class ARROW_DS_EXPORT SingleFileDataset : public Dataset {
public:
SingleFileDataset(std::shared_ptr<Schema> schema,
std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileSource> file, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format);

static Result<std::shared_ptr<SingleFileDataset>> Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::string path, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format);

static Result<std::shared_ptr<SingleFileDataset>> Make(
std::shared_ptr<Schema> schema, std::shared_ptr<Expression> root_partition,
std::shared_ptr<FileSource> file, std::shared_ptr<fs::FileSystem> fs,
std::shared_ptr<FileFormat> format);

std::string type_name() const override { return "single_file"; }
Result<std::shared_ptr<Dataset>> ReplaceSchema(
std::shared_ptr<Schema> schema) const override;

protected:
FragmentIterator GetFragmentsImpl(std::shared_ptr<ScanOptions> options) override;

private:
std::shared_ptr<FileSource> file_;
std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<FileFormat> format_;
};

} // namespace dataset
} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/dataset/file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,5 +211,10 @@ TEST_F(TestFileSystemDataset, FragmentPartitions) {
});
}

TEST_F(TestSingleFileDataset, FragmentPartitions) {
MakeSingleFileDataset("A/a");
AssertFragmentsAreFromPath(dataset_->GetFragments(options_), {"A/a"});
}

} // namespace dataset
} // namespace arrow
1 change: 1 addition & 0 deletions cpp/src/arrow/dataset/scanner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ std::shared_ptr<ScanOptions> ScanOptions::ReplaceSchema(
auto copy = ScanOptions::Make(std::move(schema));
copy->filter = filter;
copy->evaluator = evaluator;
copy->batch_size = batch_size;
return copy;
}

Expand Down
15 changes: 15 additions & 0 deletions cpp/src/arrow/dataset/test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,21 @@ static const std::string& PathOf(const std::shared_ptr<Fragment>& fragment) {
class TestFileSystemDataset : public ::testing::Test,
public MakeFileSystemDatasetMixin {};

class TestSingleFileDataset : public ::testing::Test {
public:
void MakeSingleFileDataset(const std::string& path) {
auto format = std::make_shared<DummyFileFormat>();
ASSERT_OK_AND_ASSIGN(
dataset_, SingleFileDataset::Make(schema({}), scalar(true),
std::make_shared<FileSource>(path, fs_.get()),
fs_, format));
}

std::shared_ptr<fs::FileSystem> fs_;
std::shared_ptr<Dataset> dataset_;
std::shared_ptr<ScanOptions> options_ = ScanOptions::Make(schema({}));
};

static std::vector<std::string> PathsOf(const FragmentVector& fragments) {
std::vector<std::string> paths(fragments.size());
std::transform(fragments.begin(), fragments.end(), paths.begin(), PathOf);
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/gandiva/function_registry_arithmetic.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,13 @@ std::vector<NativeFunction> GetArithmeticFunctionRegistry() {
static std::vector<NativeFunction> arithmetic_fn_registry_ = {
UNARY_SAFE_NULL_IF_NULL(not, {}, boolean, boolean),
UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, int32, int64),
UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, float32, int64),
UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, float64, int64),
UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, date64, int64),
UNARY_SAFE_NULL_IF_NULL(castINT, {}, int64, int32),
UNARY_SAFE_NULL_IF_NULL(castINT, {}, date32, int32),
UNARY_SAFE_NULL_IF_NULL(castINT, {}, float32, int32),
UNARY_SAFE_NULL_IF_NULL(castINT, {}, float64, int32),
UNARY_SAFE_NULL_IF_NULL(castBIGINT, {}, decimal128, int64),

// cast to float32
Expand All @@ -62,6 +68,7 @@ std::vector<NativeFunction> GetArithmeticFunctionRegistry() {

UNARY_SAFE_NULL_IF_NULL(castDATE, {}, int64, date64),
UNARY_SAFE_NULL_IF_NULL(castDATE, {}, int32, date32),
UNARY_SAFE_NULL_IF_NULL(castDATE, {}, date32, date64),

// add/sub/multiply/divide/mod
BINARY_SYMMETRIC_FN(add, {}), BINARY_SYMMETRIC_FN(subtract, {}),
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/gandiva/function_registry_math_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ std::vector<NativeFunction> GetMathOpsFunctionRegistry() {
BINARY_SAFE_NULL_NEVER_BOOL_FN(is_distinct_from, {}),
BINARY_SAFE_NULL_NEVER_BOOL_FN(is_not_distinct_from, {}),

UNARY_SAFE_NULL_IF_NULL(abs, {}, int32, uint32),
UNARY_SAFE_NULL_IF_NULL(abs, {}, int64, uint64),
UNARY_SAFE_NULL_IF_NULL(abs, {}, float32, float32),
UNARY_SAFE_NULL_IF_NULL(abs, {}, float64, float64),

// decimal functions
UNARY_SAFE_NULL_IF_NULL(abs, {}, decimal128, decimal128),
UNARY_SAFE_NULL_IF_NULL(ceil, {}, decimal128, decimal128),
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/gandiva/jni/jni_common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ NodePtr ProtoTypeToOrNode(const types::OrNode& node) {
}

NodePtr ProtoTypeToInNode(const types::InNode& node) {
NodePtr field = ProtoTypeToFieldNode(node.field());
NodePtr field = ProtoTypeToNode(node.node());

if (node.has_intvalues()) {
std::unordered_set<int32_t> int_values;
Expand Down
6 changes: 6 additions & 0 deletions cpp/src/gandiva/precompiled/arithmetic_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,13 @@ NUMERIC_DATE_TYPES(BINARY_RELATIONAL, greater_than_or_equal_to, >=)
}

CAST_UNARY(castBIGINT, int32, int64)
CAST_UNARY(castBIGINT, date64, int64)
CAST_UNARY(castBIGINT, float32, int64)
CAST_UNARY(castBIGINT, float64, int64)
CAST_UNARY(castINT, int64, int32)
CAST_UNARY(castINT, date32, int32)
CAST_UNARY(castINT, float32, int32)
CAST_UNARY(castINT, float64, int32)
CAST_UNARY(castFLOAT4, int32, float32)
CAST_UNARY(castFLOAT4, int64, float32)
CAST_UNARY(castFLOAT8, int32, float64)
Expand Down
18 changes: 18 additions & 0 deletions cpp/src/gandiva/precompiled/extended_math_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,24 @@ ENUMERIC_TYPES_UNARY(LOG, float64)

ENUMERIC_TYPES_UNARY(LOG10, float64)

// abs
#define ABS_TYPES_UNARY(IN_TYPE, OUT_TYPE) \
FORCE_INLINE \
gdv_##OUT_TYPE abs_##IN_TYPE(gdv_##IN_TYPE in) { \
return static_cast<gdv_##OUT_TYPE>(abs((in))); \
}

#define ABS_FTYPES_UNARY(IN_TYPE, OUT_TYPE) \
FORCE_INLINE \
gdv_##OUT_TYPE abs_##IN_TYPE(gdv_##IN_TYPE in) { \
return static_cast<gdv_##OUT_TYPE>(fabs((in))); \
}

ABS_TYPES_UNARY(int32, uint32)
ABS_TYPES_UNARY(int64, uint64)
ABS_FTYPES_UNARY(float32, float32)
ABS_FTYPES_UNARY(float64, float64)

FORCE_INLINE
void set_error_for_logbase(int64_t execution_context, double base) {
char const* prefix = "divide by zero error with log of base";
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/gandiva/precompiled/time.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ extern "C" {

// Expand inner macro for all date types.
#define DATE_TYPES(INNER) \
INNER(date32) \
INNER(date64) \
INNER(timestamp)

Expand Down Expand Up @@ -454,9 +453,13 @@ DATE_TRUNC_FUNCTIONS(timestamp)

FORCE_INLINE
gdv_date64 castDATE_int64(gdv_int64 in) { return in; }

FORCE_INLINE
gdv_date32 castDATE_int32(gdv_int32 in) { return in; }

FORCE_INLINE
gdv_date64 castDATE_date32(gdv_date32 days) { return (gdv_int64)days * MILLIS_IN_DAY; }

static int days_in_month[] = {31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31};

bool IsLastDayOfMonth(const EpochTimePoint& tp) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/gandiva/precompiled/time_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ TEST(TestTime, TestCastDate) {
EXPECT_EQ(castDATE_utf8(context_ptr, "71-1-1", 6), 31536000000);
EXPECT_EQ(castDATE_utf8(context_ptr, "71-45-1", 7), 0);
EXPECT_EQ(castDATE_utf8(context_ptr, "71-12-XX", 8), 0);

EXPECT_EQ(castDATE_date32(1), 86400000);
}

TEST(TestTime, TestCastTimestamp) {
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/gandiva/precompiled/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,11 @@ gdv_float64 log10_int64(gdv_int64);
gdv_float64 log10_float32(gdv_float32);
gdv_float64 log10_float64(gdv_float64);

gdv_uint32 abs_int32(gdv_int32);
gdv_uint64 abs_int64(gdv_int64);
gdv_float32 abs_float32(gdv_float32);
gdv_float64 abs_float64(gdv_float64);

gdv_float64 power_float64_float64(gdv_float64, gdv_float64);

gdv_float64 log_int32_int32(gdv_int64 context, gdv_int32 base, gdv_int32 value);
Expand All @@ -169,6 +174,8 @@ gdv_date64 castDATE_utf8(int64_t execution_context, const char* input, gdv_int32

gdv_date64 castDATE_int64(gdv_int64 date);

gdv_date64 castDATE_date32(gdv_date32 date);

gdv_date32 castDATE_int32(gdv_int32 date);

gdv_timestamp castTIMESTAMP_utf8(int64_t execution_context, const char* input,
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/gandiva/proto/Types.proto
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ message FunctionSignature {
}

message InNode {
optional FieldNode field = 1;
optional TreeNode node = 1;
optional IntConstants intValues = 2;
optional LongConstants longValues = 3;
optional StringConstants stringValues = 4;
Expand Down
Loading

0 comments on commit fe10d2a

Please sign in to comment.