Skip to content

Commit

Permalink
[OPPRO-104] Support date parsing and extract year (facebookincubator#18)
Browse files Browse the repository at this point in the history
* add date parsing

* support extract year

* support int32 return type

* apply pr_19

use setted pool

Parse and store the file format var into SubstraitVeloxPlanConverter
  • Loading branch information
rui-mo authored and zhejiangxiaomai committed Oct 20, 2022
1 parent ebf45e1 commit 857da1c
Show file tree
Hide file tree
Showing 11 changed files with 426 additions and 371 deletions.
8 changes: 8 additions & 0 deletions velox/core/PlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,10 +220,12 @@ class ArrowStreamNode : public PlanNode {
const PlanNodeId& id,
const RowTypePtr& outputType,
std::shared_ptr<ArrowArrayStream> arrowStream,
memory::MemoryPool* pool,
bool parallelizable = false)
: PlanNode(id),
outputType_(outputType),
arrowStream_(arrowStream),
pool_(pool),
parallelizable_(parallelizable) {
VELOX_CHECK(arrowStream != nullptr);
}
Expand All @@ -247,6 +249,11 @@ class ArrowStreamNode : public PlanNode {
return arrowStream_;
}


memory::MemoryPool* memoryPool() const {
return pool_;
}

std::string_view name() const override {
return "ArrowStream";
}
Expand All @@ -256,6 +263,7 @@ class ArrowStreamNode : public PlanNode {

const RowTypePtr outputType_;
std::shared_ptr<ArrowArrayStream> arrowStream_;
memory::MemoryPool* pool_;
const bool parallelizable_;
};

Expand Down
3 changes: 2 additions & 1 deletion velox/exec/ArrowStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ ArrowStream::ArrowStream(
arrowStream->id(),
"Arrow Stream") {
arrowStream_ = arrowStream->arrowStream();
pool_ = arrowStream->memoryPool();
}

ArrowStream::~ArrowStream() {
Expand All @@ -54,7 +55,7 @@ RowVectorPtr ArrowStream::getOutput() {
}
// Convert Arrow data into RowVector.
rowVector_ = std::dynamic_pointer_cast<RowVector>(
facebook::velox::importFromArrowAsViewer(arrowSchema, arrowArray));
facebook::velox::importFromArrowAsOwner(arrowSchema, arrowArray, pool_));
return rowVector_;
}

Expand Down
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ArrowStream : public SourceOperator {
bool closed_ = false;
RowVectorPtr rowVector_;
std::shared_ptr<ArrowArrayStream> arrowStream_;
memory::MemoryPool* pool_;

// For calls from destructor
bool isFinished0();
Expand Down
18 changes: 12 additions & 6 deletions velox/functions/prestosql/DateTimeFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,22 +139,28 @@ struct YearFunction : public InitSessionTimezone<T>,
public TimestampWithTimezoneSupport<T> {
VELOX_DEFINE_FUNCTION_TYPES(T);

FOLLY_ALWAYS_INLINE int64_t getYear(const std::tm& time) {
FOLLY_ALWAYS_INLINE int32_t getYear(const std::tm& time) {
return 1900 + time.tm_year;
}

FOLLY_ALWAYS_INLINE void call(
int64_t& result,
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(
TInput& result,
const arg_type<Timestamp>& timestamp) {
result = getYear(getDateTime(timestamp, this->timeZone_));
}

FOLLY_ALWAYS_INLINE void call(int64_t& result, const arg_type<Date>& date) {
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(TInput& result, const arg_type<Date>& date) {
result = getYear(getDateTime(date));
}

FOLLY_ALWAYS_INLINE void call(
int64_t& result,
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(
TInput& result,
const arg_type<TimestampWithTimezone>& timestampWithTimezone) {
auto timestamp = this->toTimestamp(timestampWithTimezone);
result = getYear(getDateTime(timestamp, nullptr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void registerSimpleFunctions() {
{"to_unixtime"});
registerFunction<FromUnixtimeFunction, Timestamp, double>({"from_unixtime"});

registerFunction<YearFunction, int32_t, Timestamp>({"year"});
registerFunction<YearFunction, int32_t, Date>({"year"});
registerFunction<YearFunction, int32_t, TimestampWithTimezone>({"year"});
registerFunction<YearFunction, int64_t, Timestamp>({"year"});
registerFunction<YearFunction, int64_t, Date>({"year"});
registerFunction<YearFunction, int64_t, TimestampWithTimezone>({"year"});
Expand Down
5 changes: 5 additions & 0 deletions velox/substrait/SubstraitParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,11 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
nullability = substraitType.string().nullability();
break;
}
case ::substrait::Type::KindCase::kDate: {
typeName = "DATE";
nullability = sType.date().nullability();
break;
}
default:
VELOX_NYI(
"Parsing for Substrait type not supported: {}",
Expand Down
34 changes: 34 additions & 0 deletions velox/substrait/SubstraitToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,36 @@ SubstraitVeloxExprConverter::toIsNotNullExpr(
outputType, std::move(notParams), "not");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toExtractExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType) {
VELOX_CHECK_EQ(params.size(), 2);
auto functionArg =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(params[0]);
if (functionArg) {
// Get the function argument.
auto variant = functionArg->value();
if (!variant.hasValue()) {
VELOX_FAIL("Value expected in variant.");
}
// The first parameter specifies extracting from which field.
// Only year is supported currently.
std::string from = variant.value<std::string>();

// The second parameter is the function parameter.
std::vector<std::shared_ptr<const core::ITypedExpr>> exprParams;
exprParams.reserve(1);
exprParams.emplace_back(params[1]);
if (from == "YEAR") {
return std::make_shared<const core::CallTypedExpr>(
outputType, std::move(exprParams), "year");
}
VELOX_NYI("Extract from {} not supported.", from);
}
VELOX_FAIL("Constant is expected to be the first parameter in extract.");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toVeloxExpr(
const ::substrait::Expression::ScalarFunction& substraitFunc,
Expand All @@ -85,12 +115,16 @@ SubstraitVeloxExprConverter::toVeloxExpr(
const auto& veloxType =
toVeloxType(subParser_->parseType(sFunc.output_type())->type);

if (veloxFunction == "extract") {
return toExtractExpr(params, veloxType);
}
if (veloxFunction == "alias") {
return toAliasExpr(params);
}
if (veloxFunction == "is_not_null") {
return toIsNotNullExpr(params, veloxType);
}

return std::make_shared<const core::CallTypedExpr>(
toVeloxType(typeName), std::move(params), veloxFunction);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/substrait/SubstraitToVeloxExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,11 @@ class SubstraitVeloxExprConverter {
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType);

/// Create expression for extract.
std::shared_ptr<const core::ITypedExpr> toExtractExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType);

/// Used to convert Substrait Literal into Velox Expression.
std::shared_ptr<const core::ConstantTypedExpr> toVeloxExpr(
const ::substrait::Expression::Literal& substraitLit);
Expand Down
9 changes: 5 additions & 4 deletions velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,11 +295,12 @@ core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(

const auto& inputType = childNode->outputType();
const auto& sExpr = filterRel.condition();
std::shared_ptr<const core::PlanNode>

return std::make_shared<core::FilterNode>(
nextPlanNodeId(),
exprConverter_->toVeloxExpr(sExpr, inputType),
childNode);
return std::make_shared<core::FilterNode>(
nextPlanNodeId(),
exprConverter_->toVeloxExpr(sExpr, inputType),
childNode);
}

core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
Expand Down
Loading

0 comments on commit 857da1c

Please sign in to comment.