Skip to content
This repository has been archived by the owner on Jan 3, 2023. It is now read-only.

Commit

Permalink
[SQL-DS-CACHE-73][POAE7-1011]optimize aggregation return value (#74)
Browse files Browse the repository at this point in the history
* convert avg to sum and cnt, add a ut

* optimize return type

* fix bugs and remove some log

* code style

* fix build

* fix null value.
  • Loading branch information
jikunshang authored Apr 6, 2021
1 parent 80876fa commit 197bed0
Show file tree
Hide file tree
Showing 9 changed files with 130 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ private static JsonNode constructTree(Expression expr, JsonNode rootNode) {
assert (exprs.size() == 1);
((ObjectNode) tmpNode).put("exprName", expr.nodeName());
((ObjectNode) tmpNode).put("child", constructTree(exprs.get(0), null));
((ObjectNode) tmpNode).put("dataType", expr.dataType().toString());
return tmpNode;
} else if (expr instanceof BinaryArithmetic) { // Add sub Multiply ...
assert (exprs.size() == 2);
Expand Down
2 changes: 1 addition & 1 deletion oap-ape/ape-native/src/com_intel_ape_ParquetReaderJNI.cc
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ JNIEXPORT void JNICALL Java_com_intel_ape_ParquetReaderJNI_setAggStr(JNIEnv* env
jstring aggStr) {
ape::Reader* reader = reinterpret_cast<ape::Reader*>(readerPtr);
std::string aggStr_ = env->GetStringUTFChars(aggStr, nullptr);
ARROW_LOG(INFO) << "agg str is: " << aggStr_;
ARROW_LOG(DEBUG) << "agg str is: " << aggStr_;
reader->setAgg(aggStr_);
}

Expand Down
27 changes: 9 additions & 18 deletions oap-ape/ape-native/src/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
auto start = std::chrono::steady_clock::now();
std::vector<int8_t> tmp(0);
rowsRet = filterExpression->ExecuteWithParam(rowsToRead, buffersPtr, nullsPtr, tmp);
time += std::chrono::steady_clock::now() - start;
filterTime += std::chrono::steady_clock::now() - start;
}

if (rowsRet > 0 && aggExprs.size()) { // if rows after filter is 0, no need to do agg.
Expand All @@ -339,30 +339,18 @@ int Reader::readBatch(int32_t batchSize, int64_t* buffersPtr_, int64_t* nullsPtr
std::dynamic_pointer_cast<RootAggExpression>(agg)->getResult(result);
if (result.data.size() == 1) {
// TODO: for group by
dumpToJavaBuffer((uint8_t*)(buffersPtr_[index]), result);
*(uint8_t*)(nullsPtr_[index]) = 1;
index++;
} else {
ARROW_LOG(DEBUG) << "Oops... why return " << result.data.size() << " results";
}
// TODO: refactor. For 'avg' expression, it will return two elements, which are
// 'Sum' and 'Count' and 'Count' type should be int64. However, we didn't handle
// 'Count' expression here.
for (int j = 0; j < result.data.size(); j++) {
if (j == 1) { // for `count` in `avg`
*((int64_t*)(buffersPtr_[index])) =
static_cast<int64_t>(result.data[j].low_bits());
} else {
decimalToBytes(result.data[j], result.precision,
(uint8_t*)(buffersPtr_[index]));
}
*((uint8_t*)(nullsPtr_[index])) = (uint8_t)1;
index++;
}

} else if (typeid(*agg) == typeid(AttributeReferenceExpression)) {
// TODO
}
}
rowsRet = 1;
time += std::chrono::steady_clock::now() - start;
aggTime += std::chrono::steady_clock::now() - start;
} else {
}

Expand All @@ -382,7 +370,10 @@ bool Reader::skipNextRowGroup() {
}

void Reader::close() {
ARROW_LOG(INFO) << "Filter takes " << time.count() * 1000 << " ms.";
ARROW_LOG(INFO) << "Filter takes " << filterTime.count() * 1000 << " ms. "
<< "Agg takes " << aggTime.count() * 1000 << " ms";
filterTime = std::chrono::nanoseconds::zero();
aggTime = std::chrono::nanoseconds::zero();

// No need to call parquetReader->Close(). It will be done in destructor.

Expand Down
3 changes: 2 additions & 1 deletion oap-ape/ape-native/src/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ class Reader {
int64_t totalRowsLoadedSoFar = 0;

std::shared_ptr<RootFilterExpression> filterExpression;
std::chrono::duration<double> time;
std::chrono::duration<double> filterTime = std::chrono::nanoseconds::zero();
std::chrono::duration<double> aggTime = std::chrono::nanoseconds::zero();

std::vector<char*> extraByteArrayBuffers;

Expand Down
10 changes: 10 additions & 0 deletions oap-ape/ape-native/src/test/aggParserTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,13 @@ TEST(AggParserTest, SimpleCase) {
auto exprs = ape::JsonConvertor::parseToAggExpressions(json);
EXPECT_EQ(exprs.size(), 3);
}

TEST(AggParserTest, ConvertAvgToSumAndCnt) {
std::string json =
"{\"aggregateExprs\": [{\"aliasName\": \"sum_qty\",\"exprName\": "
"\"RootAgg\",\"isDistinct\": false,\"child\": {\"exprName\": "
"\"Average\",\"child\": {\"exprName\": \"AttributeReference\",\"dataType\": "
"\"DecimalType(12,2)\",\"columnName\": \"l_quantity\"}}}]}";
auto exprs = ape::JsonConvertor::parseToAggExpressions(json);
EXPECT_EQ(exprs.size(), 2);
}
40 changes: 16 additions & 24 deletions oap-ape/ape-native/src/utils/AggExpression.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,25 +27,6 @@

namespace ape {

static inline bool isDecimalType(std::string& dataType) {
bool isDecimal = false;
std::string decimalType("DecimalType");
if (dataType.compare(0, decimalType.length(), decimalType) == 0) {
isDecimal = true;
}
return isDecimal;
}

static int getPrecisionAndScaleFromDecimalType(std::string& decimalType, int& precision,
int& scale) {
if (isDecimalType(decimalType)) {
char str[64];
sscanf(decimalType.c_str(), "%11s(%d,%d)", str, &precision, &scale);
return 0;
}
return -1;
}

class WithResultExpression : public Expression {
public:
int ExecuteWithParam(int batchSize, const std::vector<int64_t>& dataBuffers,
Expand Down Expand Up @@ -133,6 +114,7 @@ class Sum : public AggExpression {
result.data.push_back(out);
result.precision = 38; // tmp.precision;
result.scale = tmp.scale;
result.type = GetResultType(dataType);
}
};

Expand All @@ -147,6 +129,7 @@ class Min : public AggExpression {
result.data.push_back(out);
result.precision = tmp.precision;
result.scale = tmp.scale;
result.type = GetResultType(dataType);
}
};

Expand All @@ -161,25 +144,33 @@ class Max : public AggExpression {
result.data.push_back(out);
result.precision = tmp.precision;
result.scale = tmp.scale;
result.type = GetResultType(dataType);
}
};

class Count : public AggExpression {
public:
~Count() {}
void getResult(DecimalVector& result) override {
auto tmp = DecimalVector();
child->getResult(tmp);
result.data.push_back(arrow::BasicDecimal128(tmp.data.size()));
result.precision = tmp.precision;
result.scale = tmp.scale;
result.data.push_back(arrow::BasicDecimal128(count));
result.type = ResultType::LongType;
}
int ExecuteWithParam(int batchSize, const std::vector<int64_t>& dataBuffers,
const std::vector<int64_t>& nullBuffers,
std::vector<int8_t>& outBuffers) override {
count = batchSize;
return 0;
}

private:
int count = 0;
};

class Avg : public AggExpression {
public:
~Avg() {}
void getResult(DecimalVector& result) override {
// should never be called
auto tmp = DecimalVector();
child->getResult(tmp);
arrow::BasicDecimal128 sum;
Expand All @@ -190,6 +181,7 @@ class Avg : public AggExpression {
result.data.push_back(arrow::BasicDecimal128(tmp.data.size()));
result.precision = 38; // tmp.precision;
result.scale = tmp.scale;
result.type = GetResultType(dataType);
}
};

Expand Down
77 changes: 76 additions & 1 deletion oap-ape/ape-native/src/utils/DecimalConvertor.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,93 @@
#include <arrow/result.h>
#include <arrow/util/decimal.h>

#include "ApeDecimal.h"
#include "src/utils/ApeDecimal.h"

namespace ape {

using Decimal128Vector = std::vector<arrow::BasicDecimal128>;

enum ResultType {
IntType,
LongType,
FloatType,
DoubleType,
Decimal64Type,
Decimal128Type,
ErrorType
};

struct DecimalVector {
Decimal128Vector data;
int32_t precision;
int32_t scale;
ResultType type;
};

static inline bool isDecimalType(std::string& dataType) {
bool isDecimal = false;
std::string decimalType("DecimalType");
if (dataType.compare(0, decimalType.length(), decimalType) == 0) {
isDecimal = true;
}
return isDecimal;
}

static int getPrecisionAndScaleFromDecimalType(std::string& decimalType, int& precision,
int& scale) {
if (isDecimalType(decimalType)) {
char str[64];
sscanf(decimalType.c_str(), "%11s(%d,%d)", str, &precision, &scale);
return 0;
}
return -1;
}

static ResultType GetResultType(std::string s) {
if (s.compare("IntType") == 0) return IntType;
if (s.compare("LongType") == 0) return LongType;
if (s.compare("FloatType") == 0) return FloatType;
if (s.compare("DoubleType") == 0) return DoubleType;
int a, b;
if (getPrecisionAndScaleFromDecimalType(s, a, b) == 0) {
return Decimal128Type;
}
return ErrorType;
}

static void dumpToJavaBuffer(uint8_t* bufferAddr, DecimalVector& result) {
switch (result.type) {
case ResultType::IntType: {
*((int32_t*)bufferAddr) = static_cast<int32_t>(result.data[0].low_bits());
break;
}
case ResultType::LongType: {
*((int64_t*)bufferAddr) = static_cast<int64_t>(result.data[0].low_bits());
break;
}
case ResultType::FloatType: {
// TODO: convert
break;
}
case ResultType::DoubleType: {
// TODO: convert
break;
}
case ResultType::Decimal64Type: {
*((int64_t*)bufferAddr) = static_cast<int64_t>(result.data[0].low_bits());
break;
}
case ResultType::Decimal128Type: {
decimalToBytes(result.data[0], result.precision, (uint8_t*)(bufferAddr));
break;
}
default: {
ARROW_LOG(WARNING) << "Type not support!";
break;
}
}
}

class DecimalConvertor {
public:
template <typename ParquetIntegerType>
Expand Down
19 changes: 15 additions & 4 deletions oap-ape/ape-native/src/utils/JsonConvertor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace ape {

std::shared_ptr<Expression> JsonConvertor::parseToFilterExpression(
std::string jsonString) {
ARROW_LOG(INFO) << "json string " << jsonString;
ARROW_LOG(DEBUG) << "json string " << jsonString;
auto json = nlohmann::json::parse(jsonString);
return parseToFilterExpression(json);
}
Expand Down Expand Up @@ -100,7 +100,7 @@ std::shared_ptr<Expression> JsonConvertor::parseToFilterExpression(nlohmann::jso

std::vector<std::shared_ptr<Expression>> JsonConvertor::parseToGroupByExpressions(
std::string jsonString) {
ARROW_LOG(INFO) << "json string " << jsonString;
ARROW_LOG(DEBUG) << "json string " << jsonString;
nlohmann::json json;
try {
json = nlohmann::json::parse(jsonString);
Expand All @@ -127,7 +127,7 @@ std::vector<std::shared_ptr<Expression>> JsonConvertor::parseToGroupByExpression

std::vector<std::shared_ptr<Expression>> JsonConvertor::parseToAggExpressions(
std::string jsonString) {
ARROW_LOG(INFO) << "json string " << jsonString;
ARROW_LOG(DEBUG) << "json string " << jsonString;
nlohmann::json json;
try {
json = nlohmann::json::parse(jsonString);
Expand All @@ -146,7 +146,18 @@ std::vector<std::shared_ptr<Expression>> JsonConvertor::parseToAggExpressions(
auto exprs = root["aggregateExprs"];
for (int i = 0; i < exprs.size(); i++) {
auto expr = exprs[i];
v.push_back(parseToAggExpressionsHelper(expr));
// convert avg to sum + count
if ((((std::string)expr["exprName"]).compare("RootAgg") == 0) &&
((std::string(expr["child"]["exprName"])).compare("Average") == 0)) {
expr["child"]["exprName"] = "Sum";
v.push_back(parseToAggExpressionsHelper(expr));

expr["child"]["exprName"] = "Count";
v.push_back(parseToAggExpressionsHelper(expr));

} else {
v.push_back(parseToAggExpressionsHelper(expr));
}
}
std::chrono::duration<double> duration = std::chrono::steady_clock::now() - start;
ARROW_LOG(INFO) << "Parsing json takes " << duration.count() * 1000 << " ms.";
Expand Down
2 changes: 0 additions & 2 deletions oap-ape/ape-native/src/utils/UnaryFilter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ void NotEq<NullStruct>::execute(NullStruct* buffer, NullStruct value, int batchS
// TODO: add AVX/LIBXXX integration.
template <typename T>
void Gt<T>::execute(T* buffer, T value, int batchSize, std::vector<int8_t>& out) {
ARROW_LOG(DEBUG) << "gt";
#ifdef USE_LIB_XXX
// use XXX to evalute
#elif USE_AVX
Expand All @@ -105,7 +104,6 @@ void Gt<T>::execute(T* buffer, T value, int batchSize, std::vector<int8_t>& out)

template <typename T>
void GtEq<T>::execute(T* buffer, T value, int batchSize, std::vector<int8_t>& out) {
ARROW_LOG(DEBUG) << "gteq";
#ifdef USE_LIB_XXX
// use XXX to evalute
#elif USE_AVX
Expand Down

0 comments on commit 197bed0

Please sign in to comment.