Skip to content

Commit

Permalink
[OPPRO-170] Filter validation for Parquet reader at runtime (facebook…
Browse files Browse the repository at this point in the history
…incubator#27)

* Filter validation for Parquet reader at runtime

* Style

* Style

* Format

Removed special handling for avg (facebookincubator#31)

[OPPRO-173] Make batch size configurable (facebookincubator#32)

support dwrf format
  • Loading branch information
zhztheplayer authored and zhejiangxiaomai committed Nov 8, 2022
1 parent a0023a6 commit e6c2045
Show file tree
Hide file tree
Showing 27 changed files with 172 additions and 1,350 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.
cmake_minimum_required(VERSION 3.11)
set(CMAKE_MODULE_PATH "${CMAKE_CURRENT_SOURCE_DIR}/CMake" ${CMAKE_MODULE_PATH})

set(CMAKE_EXPORT_COMPILE_COMMANDS ON)
# set the project name
project(velox)

Expand Down
4 changes: 3 additions & 1 deletion third_party/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,9 @@ if(VELOX_ENABLE_ARROW)
-DCMAKE_INSTALL_PREFIX=${ARROW_PREFIX}/install
-DCMAKE_BUILD_TYPE=${CMAKE_BUILD_TYPE}
-DARROW_BUILD_STATIC=ON
-DThrift_SOURCE=${THRIFT_SOURCE})
-DThrift_SOURCE=BUNDLED
-DARROW_DEPENDENCY_SOURCE=BUNDLED
-Dre2_SOURCE=AUTO)
set(ARROW_LIBDIR ${ARROW_PREFIX}/install/${CMAKE_INSTALL_LIBDIR})

add_library(thrift STATIC IMPORTED GLOBAL)
Expand Down
2 changes: 1 addition & 1 deletion velox/connectors/hive/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
add_library(velox_hive_connector OBJECT HiveConnector.cpp FileHandle.cpp
HiveWriteProtocol.cpp)

target_link_libraries(velox_hive_connector velox_connector
target_link_libraries(velox_hive_connector velox_connector velox_dwio_common_exception
velox_dwio_dwrf_reader velox_dwio_dwrf_writer velox_file)

add_library(velox_hive_partition_function HivePartitionFunction.cpp)
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/dwrf/common/Checksum.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

#include <boost/crc.hpp>
#define XXH_INLINE_ALL
#include <xxhash.h>
#include "velox/external/xxhash/xxhash.h"

namespace facebook::velox::dwrf {

Expand Down
4 changes: 2 additions & 2 deletions velox/dwio/parquet/writer/Writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
*/

#include "velox/dwio/parquet/writer/Writer.h"
#include <arrow/c/bridge.h> // @manual
#include <arrow/c/bridge.h>
#include <arrow/table.h> // @manual
#include "velox/vector/arrow/Bridge.h"
#include "velox/vector/arrow/c/Bridge.h"

namespace facebook::velox::parquet {

Expand Down
8 changes: 7 additions & 1 deletion velox/exec/TableScan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ TableScan::TableScan(
"TableScan"),
tableHandle_(tableScanNode->tableHandle()),
columnHandles_(tableScanNode->assignments()),
driverCtx_(driverCtx) {
driverCtx_(driverCtx),
preferredBatchSize_(driverCtx->queryConfig().preferredOutputBatchSize()) {
connector_ = connector::getConnector(tableHandle_->connectorId());
}

Expand Down Expand Up @@ -139,6 +140,11 @@ bool TableScan::isFinished() {
}

void TableScan::setBatchSize() {
if (preferredBatchSize_ != 1024) {
// Not the default value.
readBatchSize_ = preferredBatchSize_;
return;
}
constexpr int64_t kMB = 1 << 20;
auto estimate = dataSource_->estimatedRowSize();
if (estimate == connector::DataSource::kUnknownRowSize) {
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/TableScan.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ class TableScan : public SourceOperator {
std::unordered_map<column_index_t, std::shared_ptr<common::Filter>>
pendingDynamicFilters_;
int32_t readBatchSize_{kDefaultBatchSize};
// A preferred batch size from configuration.
uint32_t preferredBatchSize_;

// String shown in ExceptionContext inside DataSource and LazyVector loading.
std::string debugString_;
Expand Down
2 changes: 1 addition & 1 deletion velox/external/duckdb/duckdb.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,7 @@ namespace duckdb {
DUCKDB_API void DuckDBAssertInternal(bool condition, const char *condition_name, const char *file, int linenr);
}

#define D_ASSERT(condition) duckdb::DuckDBAssertInternal(bool(condition), #condition, __FILE__, __LINE__)
#define D_ASSERT(condition) ::duckdb::DuckDBAssertInternal(bool(condition), #condition, __FILE__, __LINE__)

#endif

Expand Down
2 changes: 1 addition & 1 deletion velox/substrait/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ set(SRCS
SubstraitParser.cpp
SubstraitToVeloxExpr.cpp
SubstraitToVeloxPlan.cpp
TypeUtils.cpp
TypeUtils.cpp_out
SubstraitExtensionCollector.cpp
VeloxToSubstraitExpr.cpp
VeloxToSubstraitPlan.cpp
Expand Down
41 changes: 8 additions & 33 deletions velox/substrait/SubstraitParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
switch (substraitType.kind_case()) {
case ::substrait::Type::KindCase::kBool: {
typeName = "BOOLEAN";
nullability = sType.bool_().nullability();
nullability = substraitType.bool_().nullability();
break;
}
case ::substrait::Type::KindCase::kI8: {
Expand Down Expand Up @@ -115,7 +115,7 @@ std::shared_ptr<SubstraitParser::SubstraitType> SubstraitParser::parseType(
}
case ::substrait::Type::KindCase::kDate: {
typeName = "DATE";
nullability = sType.date().nullability();
nullability = substraitType.date().nullability();
break;
}
default:
Expand Down Expand Up @@ -217,39 +217,14 @@ std::string SubstraitParser::findSubstraitFuncSpec(
return map[id];
}

std::string SubstraitParser::getFunctionName(
const std::string& functionSpec) const {
std::string SubstraitParser::getSubFunctionName(
const std::string& subFuncSpec) const {
// Get the position of ":" in the function name.
std::size_t pos = functionSpec.find(":");
std::size_t pos = subFuncSpec.find(":");
if (pos == std::string::npos) {
return functionSpec;
return subFuncSpec;
}
return functionSpec.substr(0, pos);
}

void SubstraitParser::getFunctionTypes(
const std::string& functionSpec,
std::vector<std::string>& types) const {
types.clear();
// Get the position of ":" in the function name.
std::size_t pos = functionSpec.find(":");
// Get the parameter types.
std::string funcTypes;
if (pos == std::string::npos) {
return;
} else {
if (pos == functionSpec.size() - 1) {
return;
}
funcTypes = functionSpec.substr(pos + 1);
}
// Split the types with delimiter.
std::string delimiter = "_";
while ((pos = funcTypes.find(delimiter)) != std::string::npos) {
types.emplace_back(funcTypes.substr(0, pos));
funcTypes.erase(0, pos + delimiter.length());
}
types.emplace_back(funcTypes);
return subFuncSpec.substr(0, pos);
}

void SubstraitParser::getSubFunctionTypes(
Expand Down Expand Up @@ -293,7 +268,7 @@ std::string SubstraitParser::mapToVeloxFunction(

// If not finding the mapping from Substrait function name to Velox function
// name, the original Substrait function name will be used.
return substraitFunction;
return subFunc;
}

} // namespace facebook::velox::substrait
9 changes: 2 additions & 7 deletions velox/substrait/SubstraitParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,13 @@ class SubstraitParser {
/// Currently, the input types in the function specification are not used. But
/// in the future, they should be used for the validation according the
/// specifications in Substrait yaml files.
const std::string& findFunctionSpec(
std::string findSubstraitFuncSpec(
const std::unordered_map<uint64_t, std::string>& functionMap,
uint64_t id) const;

/// Extracts the function name for a function from specified compound name.
/// When the input is a simple name, it will be returned.
std::string getFunctionName(const std::string& functionSpec) const;

/// Extracts argument types for a function from specified compound name.
void getFunctionTypes(
const std::string& functionSpec,
std::vector<std::string>& types) const;
std::string getSubFunctionName(const std::string& functionSpec) const;

/// This function is used get the types from the compound name.
void getSubFunctionTypes(
Expand Down
64 changes: 39 additions & 25 deletions velox/substrait/SubstraitToVeloxExpr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ SubstraitVeloxExprConverter::toVeloxExpr(
switch (typeCase) {
case ::substrait::Expression::FieldReference::ReferenceTypeCase::
kDirectReference: {
const auto& dRef = sField.direct_reference();
const auto& dRef = substraitField.direct_reference();
int32_t colIdx = subParser_->parseReferenceSegment(dRef);
const auto& inputNames = inputType->names();
const int64_t inputSize = inputNames.size();
Expand Down Expand Up @@ -101,9 +101,33 @@ SubstraitVeloxExprConverter::toExtractExpr(
VELOX_FAIL("Constant is expected to be the first parameter in extract.");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toRowConstructorExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const std::string& typeName) {
std::vector<std::string> structTypeNames;
subParser_->getSubFunctionTypes(typeName, structTypeNames);
VELOX_CHECK(
structTypeNames.size() > 0, "At lease one type name is expected.");

// Preparation for the conversion from struct types to RowType.
std::vector<TypePtr> rowTypes;
std::vector<std::string> names;
for (int idx = 0; idx < structTypeNames.size(); idx++) {
std::string substraitTypeName = structTypeNames[idx];
names.emplace_back("col_" + std::to_string(idx));
rowTypes.emplace_back(std::move(toVeloxType(substraitTypeName)));
}

return std::make_shared<const core::CallTypedExpr>(
ROW(std::move(names), std::move(rowTypes)),
std::move(params),
"row_constructor");
}

std::shared_ptr<const core::ITypedExpr>
SubstraitVeloxExprConverter::toVeloxExpr(
const ::substrait::Expression::ScalarFunction& substraitFunc,
const ::substrait::Expression::ScalarFunction& sFunc,
const RowTypePtr& inputType) {
std::vector<core::TypedExprPtr> params;
params.reserve(substraitFunc.arguments().size());
Expand All @@ -112,17 +136,19 @@ SubstraitVeloxExprConverter::toVeloxExpr(
}
const auto& veloxFunction =
subParser_->findVeloxFunction(functionMap_, sFunc.function_reference());
const auto& veloxType =
toVeloxType(subParser_->parseType(sFunc.output_type())->type);
std::string typeName = subParser_->parseType(sFunc.output_type())->type;

if (veloxFunction == "extract") {
return toExtractExpr(params, veloxType);
return toExtractExpr(std::move(params), toVeloxType(typeName));
}
if (veloxFunction == "alias") {
return toAliasExpr(params);
return toAliasExpr(std::move(params));
}
if (veloxFunction == "is_not_null") {
return toIsNotNullExpr(params, veloxType);
return toIsNotNullExpr(std::move(params), toVeloxType(typeName));
}
if (veloxFunction == "row_constructor") {
return toRowConstructorExpr(std::move(params), typeName);
}

return std::make_shared<const core::CallTypedExpr>(
Expand Down Expand Up @@ -152,7 +178,8 @@ SubstraitVeloxExprConverter::toVeloxExpr(
return std::make_shared<core::ConstantTypedExpr>(
variant(substraitLit.fp32()));
case ::substrait::Expression_Literal::LiteralTypeCase::kI64:
return std::make_shared<core::ConstantTypedExpr>(variant(sLit.i64()));
return std::make_shared<core::ConstantTypedExpr>(
variant(substraitLit.i64()));
case ::substrait::Expression_Literal::LiteralTypeCase::kFp64:
return std::make_shared<core::ConstantTypedExpr>(
variant(substraitLit.fp64()));
Expand All @@ -161,20 +188,20 @@ SubstraitVeloxExprConverter::toVeloxExpr(
variant(substraitLit.string()));
case ::substrait::Expression_Literal::LiteralTypeCase::kNull: {
auto veloxType =
toVeloxType(substraitParser_.parseType(substraitLit.null())->type);
toVeloxType(subParser_->parseType(substraitLit.null())->type);
return std::make_shared<core::ConstantTypedExpr>(
veloxType, variant::null(veloxType->kind()));
}
case ::substrait::Expression_Literal::LiteralTypeCase::kList: {
// List is used in 'in' expression. Will wrap a constant
// vector with an array vector inside to create the constant expression.
std::vector<variant> variants;
variants.reserve(sLit.list().values().size());
variants.reserve(substraitLit.list().values().size());
VELOX_CHECK(
sLit.list().values().size() > 0,
substraitLit.list().values().size() > 0,
"List should have at least one item.");
std::optional<TypePtr> literalType = std::nullopt;
for (const auto& literal : sLit.list().values()) {
for (const auto& literal : substraitLit.list().values()) {
auto typedVariant = toTypedVariant(literal);
if (!literalType.has_value()) {
literalType = typedVariant->variantType;
Expand Down Expand Up @@ -302,19 +329,6 @@ SubstraitVeloxExprConverter::toVeloxExpr(
resultType = thenClauseExpr->type();
}
}

if (substraitIfThen.has_else_()) {
auto elseClauseExpr = toVeloxExpr(substraitIfThen.else_(), inputType);
inputs.emplace_back(elseClauseExpr);
if (!resultType && !elseClauseExpr->type()->containsUnknown()) {
resultType = elseClauseExpr->type();
}
}

VELOX_CHECK_NOT_NULL(resultType, "Result type not found");

return std::make_shared<const core::CallTypedExpr>(
resultType, std::move(inputs), "if");
}

} // namespace facebook::velox::substrait
7 changes: 6 additions & 1 deletion velox/substrait/SubstraitToVeloxExpr.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class SubstraitVeloxExprConverter {

/// Convert Substrait ScalarFunction into Velox Expression.
std::shared_ptr<const core::ITypedExpr> toVeloxExpr(
const ::substrait::Expression::ScalarFunction& substraitFunc,
const ::substrait::Expression::ScalarFunction& sFunc,
const RowTypePtr& inputType);

/// Convert Substrait CastExpression to Velox Expression.
Expand All @@ -73,6 +73,11 @@ class SubstraitVeloxExprConverter {
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const TypePtr& outputType);

/// Create expression for row_constructor.
std::shared_ptr<const core::ITypedExpr> toRowConstructorExpr(
const std::vector<std::shared_ptr<const core::ITypedExpr>>& params,
const std::string& typeName);

/// Used to convert Substrait Literal into Velox Expression.
std::shared_ptr<const core::ConstantTypedExpr> toVeloxExpr(
const ::substrait::Expression::Literal& substraitLit);
Expand Down
Loading

0 comments on commit e6c2045

Please sign in to comment.