Skip to content

Commit

Permalink
[OPPRO-104] Support date parsing and extract year (facebookincubator#18)
Browse files Browse the repository at this point in the history
* add date parsing

* support extract year

* support int32 return type

* apply pr_19

use setted pool

Parse and store the file format var into SubstraitVeloxPlanConverter
  • Loading branch information
rui-mo authored and zhejiangxiaomai committed Feb 27, 2023
1 parent b938063 commit 0745672
Show file tree
Hide file tree
Showing 9 changed files with 69 additions and 28 deletions.
1 change: 1 addition & 0 deletions velox/exec/ArrowStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ArrowStream : public SourceOperator {

bool finished_ = false;
std::shared_ptr<ArrowArrayStream> arrowStream_;
memory::MemoryPool* pool_;

// For calls from destructor
bool isFinished0();
Expand Down
18 changes: 12 additions & 6 deletions velox/functions/prestosql/DateTimeFunctions.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,28 @@ struct YearFunction : public InitSessionTimezone<T>,
public TimestampWithTimezoneSupport<T> {
VELOX_DEFINE_FUNCTION_TYPES(T);

FOLLY_ALWAYS_INLINE int64_t getYear(const std::tm& time) {
FOLLY_ALWAYS_INLINE int32_t getYear(const std::tm& time) {
return 1900 + time.tm_year;
}

FOLLY_ALWAYS_INLINE void call(
int64_t& result,
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(
TInput& result,
const arg_type<Timestamp>& timestamp) {
result = getYear(getDateTime(timestamp, this->timeZone_));
}

FOLLY_ALWAYS_INLINE void call(int64_t& result, const arg_type<Date>& date) {
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(TInput& result, const arg_type<Date>& date) {
result = getYear(getDateTime(date));
}

FOLLY_ALWAYS_INLINE void call(
int64_t& result,
template <typename TInput>
FOLLY_ALWAYS_INLINE
void call(
TInput& result,
const arg_type<TimestampWithTimezone>& timestampWithTimezone) {
auto timestamp = this->toTimestamp(timestampWithTimezone);
result = getYear(getDateTime(timestamp, nullptr));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ void registerSimpleFunctions() {
{"to_unixtime"});
registerFunction<FromUnixtimeFunction, Timestamp, double>({"from_unixtime"});

registerFunction<YearFunction, int32_t, Timestamp>({"year"});
registerFunction<YearFunction, int32_t, Date>({"year"});
registerFunction<YearFunction, int32_t, TimestampWithTimezone>({"year"});
registerFunction<YearFunction, int64_t, Timestamp>({"year"});
registerFunction<YearFunction, int64_t, Date>({"year"});
registerFunction<YearFunction, int64_t, TimestampWithTimezone>({"year"});
Expand Down
2 changes: 1 addition & 1 deletion velox/substrait/SubstraitParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
switch (substraitType.kind_case()) {
case ::substrait::Type::KindCase::kBool: {
typeName = "BOOLEAN";
nullability = sType.bool_().nullability();
nullability = substraitType.bool_().nullability();
break;
}
case ::substrait::Type::KindCase::kI8: {
Expand Down
34 changes: 34 additions & 0 deletions velox/substrait/SubstraitToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,36 @@ SubstraitVeloxExprConverter::toIsNotNullExpr(
outputType, std::move(notParams), "not");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toExtractExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType) {
VELOX_CHECK_EQ(params.size(), 2);
auto functionArg =
std::dynamic_pointer_cast<const core::ConstantTypedExpr>(params[0]);
if (functionArg) {
// Get the function argument.
auto variant = functionArg->value();
if (!variant.hasValue()) {
VELOX_FAIL("Value expected in variant.");
}
// The first parameter specifies extracting from which field.
// Only year is supported currently.
std::string from = variant.value<std::string>();

// The second parameter is the function parameter.
std::vector<std::shared_ptr<const core::ITypedExpr>> exprParams;
exprParams.reserve(1);
exprParams.emplace_back(params[1]);
if (from == "YEAR") {
return std::make_shared<const core::CallTypedExpr>(
outputType, std::move(exprParams), "year");
}
VELOX_NYI("Extract from {} not supported.", from);
}
VELOX_FAIL("Constant is expected to be the first parameter in extract.");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toVeloxExpr(
const ::substrait::Expression::ScalarFunction& substraitFunc,
Expand All @@ -215,12 +245,16 @@ SubstraitVeloxExprConverter::toVeloxExpr(
const auto& veloxType =
toVeloxType(subParser_->parseType(sFunc.output_type())->type);

if (veloxFunction == "extract") {
return toExtractExpr(params, veloxType);
}
if (veloxFunction == "alias") {
return toAliasExpr(params);
}
if (veloxFunction == "is_not_null") {
return toIsNotNullExpr(params, veloxType);
}

return std::make_shared<const core::CallTypedExpr>(
toVeloxType(typeName), std::move(params), veloxFunction);
}
Expand Down
5 changes: 5 additions & 0 deletions velox/substrait/SubstraitToVeloxExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ class SubstraitVeloxExprConverter {
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType);

/// Create expression for extract.
std::shared_ptr<const core::ITypedExpr> toExtractExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType);

/// Used to convert Substrait Literal into Velox Expression.
std::shared_ptr<const core::ConstantTypedExpr> toVeloxExpr(
const ::substrait::Expression::Literal& substraitLit);
Expand Down
1 change: 0 additions & 1 deletion velox/substrait/SubstraitToVeloxPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,6 @@ SubstraitVeloxPlanConverter::processSortField(
core::PlanNodePtr SubstraitVeloxPlanConverter::toVeloxPlan(
const ::substrait::FilterRel& filterRel) {
auto childNode = convertSingleInput<::substrait::FilterRel>(filterRel);

auto filterNode = std::make_shared<core::FilterNode>(
nextPlanNodeId(),
exprConverter_->toVeloxExpr(
Expand Down
31 changes: 11 additions & 20 deletions velox/substrait/SubstraitToVeloxPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ class SubstraitVeloxPlanConverter {
bool validationMode = false)
: pool_(pool), validationMode_(validationMode) {}

/// This class is used to convert the Substrait plan into Velox plan.
/// Used to convert Substrait JoinRel into Velox PlanNode.
core::PlanNodePtr toVeloxPlan(const ::substrait::JoinRel& sJoin);

Expand Down Expand Up @@ -116,7 +117,8 @@ class SubstraitVeloxPlanConverter {
const ::substrait::Plan& sPlan);

/// Used to construct the function map between the index
/// and the Substrait function name.
/// and the Substrait function name. Initialize the expression
/// converter based on the constructed function map.
void constructFuncMap(const ::substrait::Plan& sPlan);

/// Will return the function map used by this plan converter.
Expand Down Expand Up @@ -401,25 +403,17 @@ class SubstraitVeloxPlanConverter {
return toVeloxPlan(rel.input());
}

/// The Partition index.
u_int32_t partitionIndex_;

/// The file paths to be scanned.
std::vector<std::string> paths_;

/// The file starts in the scan.
std::vector<u_int64_t> starts_;

/// The lengths to be scanned.
std::vector<u_int64_t> lengths_;

/// The unique identification for each PlanNode.
int planNodeId_ = 0;

/// The map storing the relations between the function id and the function
/// name. Will be constructed based on the Substrait representation.
std::unordered_map<uint64_t, std::string> functionMap_;

/// The map storing the split stats for each PlanNode.
std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>>
splitInfoMap_;

/// The map storing the pre-built plan nodes which can be accessed through
/// index. This map is only used when the computation of a Substrait plan
/// depends on other input nodes.
Expand All @@ -430,19 +424,16 @@ class SubstraitVeloxPlanConverter {
/// recognizable representations.
std::shared_ptr<SubstraitParser> subParser_{
std::make_shared<SubstraitParser>()};
/// Mapping from leaf plan node ID to splits.
std::unordered_map<core::PlanNodeId, std::shared_ptr<SplitInfo>>
splitInfoMap_;

/// The Expression converter used to convert Substrait representations into
/// Velox expressions.
std::shared_ptr<SubstraitVeloxExprConverter> exprConverter_;

/// Memory pool.
memory::MemoryPool* pool_;

/// A flag used to specify validation.
bool validationMode_ = false;

/// The Expression converter used to convert Substrait representations into
/// Velox expressions.
std::shared_ptr<SubstraitVeloxExprConverter> exprConverter_;
};

} // namespace facebook::velox::substrait
2 changes: 2 additions & 0 deletions velox/substrait/TypeUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ TypePtr toVeloxType(const std::string& typeName) {
}
case TypeKind::UNKNOWN:
return UNKNOWN();
case TypeKind::DATE:
return DATE();
default:
VELOX_NYI("Velox type conversion not supported for type {}.", typeName);
}
Expand Down

0 comments on commit 0745672

Please sign in to comment.