Skip to content

Commit

Permalink
add list wrapper
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 committed Jan 12, 2023
1 parent a025f92 commit 669fe2d
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 92 deletions.
2 changes: 1 addition & 1 deletion src/graph/context/ast/CypherAstContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ struct Path final {
// True for pattern expression, to collect path to list
bool rollUpApply{false};
// vector ["v"] in (v)-[:like]->()
std::vector<std::string> compareVariables;
std::vector<std::pair<std::string, bool>> compareVariables;
// "(v)-[:like]->()" in (v)-[:like]->()
std::string collectVariable;

Expand Down
92 changes: 92 additions & 0 deletions src/graph/executor/Executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,98 @@ namespace nebula {
namespace graph {
class PlanNode;
class QueryContext;

class ListWrapper {
public:
ListWrapper() = default;

void emplace(const Value &val, bool type) {
pairs_.emplace_back(std::make_pair(val, type));
}

std::string toString() const {
std::string str;
for (auto &v : pairs_) {
str += v.first.toString();
str += " ";
str += v.second ? " true " : " false ";
}
return str;
}

std::vector<std::pair<Value, bool>> pairs() const {
return pairs_;
}

private:
// pair.second true : normal, false : edgeList
std::vector<std::pair<Value, bool>> pairs_;
};

struct WrapperHash {
std::size_t operator()(const ListWrapper &wrapper) const {
size_t seed = 0;
for (const auto &pair : wrapper.pairs()) {
if (pair.second) {
seed ^= std::hash<nebula::Value>()(pair.first);
} else {
const auto &values = pair.first.getList().values;
for (const auto &v : values) {
seed ^= std::hash<nebula::Value>()(v);
}
}
}
DLOG(ERROR) << "hashValue : " << wrapper.toString() << " values : " << seed;
return seed;
}
};

struct WrapperEqual {
bool operator()(const ListWrapper &lhs, const ListWrapper &rhs) const {
const auto &pairs = lhs.pairs();
size_t size = pairs.size();
const auto &otherPairs = rhs.pairs();
if (size != otherPairs.size()) {
return false;
}
for (size_t i = 0; i < size; ++i) {
auto type = pairs[i].second;
if (type != otherPairs[i].second) {
return false;
}
if (type) {
if (pairs[i].first != otherPairs[i].first) {
return false;
} else {
continue;
}
}
const auto &values = pairs[i].first.getList().values;
const auto &otherValues = otherPairs[i].first.getList().values;
size_t listSize = values.size();
if (listSize != otherValues.size()) {
return false;
}
if (values.front() == otherValues.front()) {
for (size_t j = 1; j < listSize; ++j) {
if (values[j] != otherValues[j]) {
return false;
}
}
} else if (values.front() == otherValues.back()) {
for (size_t j = 1; j < listSize; ++j) {
if (values[j] != otherValues[listSize - 1 - j]) {
return false;
}
}
} else {
return false;
}
}
return true;
}
};

class Executor : private boost::noncopyable, private cpp::NonMovable {
public:
// Create executor according to plan node
Expand Down
52 changes: 26 additions & 26 deletions src/graph/executor/query/PatternApplyExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ Status PatternApplyExecutor::checkBiInputDataSets() {
return Status::OK();
}

void PatternApplyExecutor::collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const {
void PatternApplyExecutor::collectValidKeys(
const std::vector<std::pair<Expression*, bool>>& pairs,
Iterator* iter,
std::unordered_set<ListWrapper, WrapperHash, WrapperEqual>& validKeys) const {
QueryExpressionContext ctx(ectx_);
for (; iter->valid(); iter->next()) {
List list;
list.values.reserve(keyCols.size());
for (auto& col : keyCols) {
Value val = col->eval(ctx(iter));
list.values.emplace_back(std::move(val));
ListWrapper listWrapper;
for (auto& pair : pairs) {
Value val = pair.first->eval(ctx(iter));
listWrapper.emplace(std::move(val), pair.second);
}
validKeys.emplace(std::move(list));
validKeys.emplace(std::move(listWrapper));
}
}

Expand Down Expand Up @@ -93,21 +93,21 @@ DataSet PatternApplyExecutor::applySingleKey(Expression* appliedKey,
return ds;
}

DataSet PatternApplyExecutor::applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys) {
DataSet PatternApplyExecutor::applyMultiKey(
std::vector<std::pair<Expression*, bool>> pairs,
Iterator* appliedIter,
const std::unordered_set<ListWrapper, WrapperHash, WrapperEqual>& validKeys) {
DataSet ds;
ds.rows.reserve(appliedIter->size());
QueryExpressionContext ctx(ectx_);
for (; appliedIter->valid(); appliedIter->next()) {
List list;
list.values.reserve(appliedKeys.size());
for (auto& col : appliedKeys) {
Value val = col->eval(ctx(appliedIter));
list.values.emplace_back(std::move(val));
ListWrapper listWrapper;
for (auto& pair : pairs) {
Value val = pair.first->eval(ctx(appliedIter));
listWrapper.emplace(std::move(val), pair.second);
}

bool applyFlag = (validKeys.find(list) != validKeys.end()) ^ isAntiPred_;
bool applyFlag = (validKeys.find(listWrapper) != validKeys.end()) ^ isAntiPred_;
if (applyFlag) {
Row row = mv_ ? appliedIter->moveRow() : *appliedIter->row();
ds.rows.emplace_back(std::move(row));
Expand All @@ -128,20 +128,20 @@ folly::Future<Status> PatternApplyExecutor::patternApply() {
applyZeroKey(lhsIter_.get(), (rhsIter_->size() > 0) ^ isAntiPred_);
} else if (keyCols.size() == 1) {
std::unordered_set<Value> validKey;
collectValidKey(keyCols[0]->clone(), rhsIter_.get(), validKey);
result = applySingleKey(keyCols[0]->clone(), lhsIter_.get(), validKey);
collectValidKey(keyCols[0].first->clone(), rhsIter_.get(), validKey);
result = applySingleKey(keyCols[0].first->clone(), lhsIter_.get(), validKey);
} else {
// Copy the keyCols to refresh the inside propIndex_ cache
auto cloneExpr = [](std::vector<Expression*> exprs) {
std::vector<Expression*> applyColsCopy;
applyColsCopy.reserve(exprs.size());
for (auto& expr : exprs) {
applyColsCopy.emplace_back(expr->clone());
auto cloneExpr = [](std::vector<std::pair<Expression*, bool>> pairs) {
std::vector<std::pair<Expression*, bool>> applyColsCopy;
applyColsCopy.reserve(pairs.size());
for (auto& pair : pairs) {
applyColsCopy.emplace_back(std::make_pair(pair.first->clone(), pair.second));
}
return applyColsCopy;
};

std::unordered_set<List> validKeys;
std::unordered_set<ListWrapper, WrapperHash, WrapperEqual> validKeys;
collectValidKeys(cloneExpr(keyCols), rhsIter_.get(), validKeys);
result = applyMultiKey(cloneExpr(keyCols), lhsIter_.get(), validKeys);
}
Expand Down
14 changes: 8 additions & 6 deletions src/graph/executor/query/PatternApplyExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ class PatternApplyExecutor : public Executor {
protected:
Status checkBiInputDataSets();

void collectValidKeys(const std::vector<Expression*>& keyCols,
Iterator* iter,
std::unordered_set<List>& validKeys) const;
void collectValidKeys(
const std::vector<std::pair<Expression*, bool>>& keyCols,
Iterator* iter,
std::unordered_set<ListWrapper, WrapperHash, WrapperEqual>& validKeys) const;

void collectValidKey(Expression* keyCol,
Iterator* iter,
Expand All @@ -34,9 +35,10 @@ class PatternApplyExecutor : public Executor {
Iterator* appliedIter,
const std::unordered_set<Value>& validKey);

DataSet applyMultiKey(std::vector<Expression*> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<List>& validKeys);
DataSet applyMultiKey(
std::vector<std::pair<Expression*, bool>> appliedKeys,
Iterator* appliedIter,
const std::unordered_set<ListWrapper, WrapperHash, WrapperEqual>& validKeys);

folly::Future<Status> patternApply();
std::unique_ptr<Iterator> lhsIter_;
Expand Down
57 changes: 30 additions & 27 deletions src/graph/executor/query/RollUpApplyExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,22 @@ Status RollUpApplyExecutor::checkBiInputDataSets() {
return Status::OK();
}

void RollUpApplyExecutor::buildHashTable(const std::vector<Expression*>& compareCols,
const InputPropertyExpression* collectCol,
Iterator* iter,
std::unordered_map<List, List>& hashTable) const {
void RollUpApplyExecutor::buildHashTable(
const std::vector<std::pair<Expression*, bool>>& compareCols,
const InputPropertyExpression* collectCol,
Iterator* iter,
std::unordered_map<ListWrapper, List, WrapperHash, WrapperEqual>& hashTable) const {
QueryExpressionContext ctx(ectx_);

for (; iter->valid(); iter->next()) {
List list;
list.values.reserve(compareCols.size());
for (auto& col : compareCols) {
Value val = col->eval(ctx(iter));
list.values.emplace_back(std::move(val));
ListWrapper listWrapper;
for (auto& pair : compareCols) {
Value val = pair.first->eval(ctx(iter));
listWrapper.emplace(val, pair.second);
}
DLOG(ERROR) << "Hash Key : " << listWrapper.toString();

auto& vals = hashTable[list];
auto& vals = hashTable[listWrapper];
vals.emplace_back(const_cast<InputPropertyExpression*>(collectCol)->eval(ctx(iter)));
}
}
Expand Down Expand Up @@ -112,24 +113,26 @@ DataSet RollUpApplyExecutor::probeSingleKey(Expression* probeKey,
return ds;
}

DataSet RollUpApplyExecutor::probe(std::vector<Expression*> probeKeys,
Iterator* probeIter,
const std::unordered_map<List, List>& hashTable) {
DataSet RollUpApplyExecutor::probe(
std::vector<std::pair<Expression*, bool>> probeKeys,
Iterator* probeIter,
const std::unordered_map<ListWrapper, List, WrapperHash, WrapperEqual>& hashTable) {
DataSet ds;
ds.rows.reserve(probeIter->size());
QueryExpressionContext ctx(ectx_);
for (; probeIter->valid(); probeIter->next()) {
List list;
list.values.reserve(probeKeys.size());
for (auto& col : probeKeys) {
Value val = col->eval(ctx(probeIter));
list.values.emplace_back(std::move(val));
ListWrapper listWrapper;
for (auto& pair : probeKeys) {
Value val = pair.first->eval(ctx(probeIter));
listWrapper.emplace(std::move(val), pair.second);
}

List vals;
auto found = hashTable.find(list);
DLOG(ERROR) << "Hash Find : " << listWrapper.toString();
auto found = hashTable.find(listWrapper);
if (found != hashTable.end()) {
vals = found->second;
DLOG(ERROR) << " FOUND !!! ";
}
Row row = mv_ ? probeIter->moveRow() : *probeIter->row();
row.emplace_back(std::move(vals));
Expand All @@ -153,20 +156,20 @@ folly::Future<Status> RollUpApplyExecutor::rollUpApply() {
} else if (compareCols.size() == 1) {
std::unordered_map<Value, List> hashTable;
buildSingleKeyHashTable(
compareCols[0]->clone(), rollUpApplyNode->collectCol(), rhsIter_.get(), hashTable);
result = probeSingleKey(compareCols[0]->clone(), lhsIter_.get(), hashTable);
compareCols[0].first->clone(), rollUpApplyNode->collectCol(), rhsIter_.get(), hashTable);
result = probeSingleKey(compareCols[0].first->clone(), lhsIter_.get(), hashTable);
} else {
// Copy the compareCols to make sure the propIndex_ is not cached in the expr
auto cloneExpr = [](std::vector<Expression*> exprs) {
std::vector<Expression*> collectColsCopy;
collectColsCopy.reserve(exprs.size());
for (auto& expr : exprs) {
collectColsCopy.emplace_back(expr->clone());
auto cloneExpr = [](std::vector<std::pair<Expression*, bool>> pairs) {
std::vector<std::pair<Expression*, bool>> collectColsCopy;
collectColsCopy.reserve(pairs.size());
for (auto& pair : pairs) {
collectColsCopy.emplace_back(std::make_pair(pair.first->clone(), pair.second));
}
return collectColsCopy;
};

std::unordered_map<List, List> hashTable;
std::unordered_map<ListWrapper, List, WrapperHash, WrapperEqual> hashTable;
buildHashTable(
cloneExpr(compareCols), rollUpApplyNode->collectCol(), rhsIter_.get(), hashTable);

Expand Down
15 changes: 7 additions & 8 deletions src/graph/executor/query/RollUpApplyExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,13 @@ class RollUpApplyExecutor : public Executor {
folly::Future<Status> execute() override;

protected:
void buildHashTable(const std::vector<Expression*>& compareCols, Iterator* iter);

Status checkBiInputDataSets();

void buildHashTable(const std::vector<Expression*>& compareCols,
const InputPropertyExpression* collectCol,
Iterator* iter,
std::unordered_map<List, List>& hashTable) const;
void buildHashTable(
const std::vector<std::pair<Expression*, bool>>& compareCols,
const InputPropertyExpression* collectCol,
Iterator* iter,
std::unordered_map<ListWrapper, List, WrapperHash, WrapperEqual>& hashTable) const;

void buildSingleKeyHashTable(Expression* compareCol,
const InputPropertyExpression* collectCol,
Expand All @@ -42,9 +41,9 @@ class RollUpApplyExecutor : public Executor {
Iterator* probeIter,
const std::unordered_map<Value, List>& hashTable);

DataSet probe(std::vector<Expression*> probeKeys,
DataSet probe(std::vector<std::pair<Expression*, bool>> probeKeys,
Iterator* probeIter,
const std::unordered_map<List, List>& hashTable);
const std::unordered_map<ListWrapper, List, WrapperHash, WrapperEqual>& hashTable);

folly::Future<Status> rollUpApply();

Expand Down
18 changes: 10 additions & 8 deletions src/graph/planner/match/SegmentsConnector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,11 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
auto* qctx = ctx->qctx;

SubPlan newPlan = left;
std::vector<Expression*> compareProps;
for (const auto& col : path.compareVariables) {
compareProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), col)}));
std::vector<std::pair<Expression*, bool>> compareProps;
for (const auto& pair : path.compareVariables) {
auto expr = FunctionCallExpression::make(
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), pair.first)});
compareProps.emplace_back(std::make_pair(expr, pair.second));
}
InputPropertyExpression* collectProp = InputPropertyExpression::make(qctx->objPool(), collectCol);
auto* rollUpApply = RollUpApply::make(
Expand All @@ -101,10 +102,11 @@ SubPlan SegmentsConnector::rollUpApply(CypherClauseContextBase* ctx,
const graph::Path& path) {
SubPlan newPlan = left;
auto qctx = ctx->qctx;
std::vector<Expression*> keyProps;
for (const auto& col : path.compareVariables) {
keyProps.emplace_back(FunctionCallExpression::make(
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), col)}));
std::vector<std::pair<Expression*, bool>> keyProps;
for (const auto& pair : path.compareVariables) {
auto* expr = FunctionCallExpression::make(
qctx->objPool(), "_joinkey", {InputPropertyExpression::make(qctx->objPool(), pair.first)});
keyProps.emplace_back(std::make_pair(expr, pair.second));
}
auto* patternApply = PatternApply::make(
qctx, left.root, DCHECK_NOTNULL(right.root), std::move(keyProps), path.isAntiPred);
Expand Down
Loading

0 comments on commit 669fe2d

Please sign in to comment.