Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Yield sentence #745

Merged
merged 16 commits into from
Oct 30, 2019
6 changes: 6 additions & 0 deletions src/common/base/Base.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ using VariantType = boost::variant<int64_t, double, bool, std::string>;
#define VAR_STR 3
#endif

// reserved property names
constexpr char _ID[] = "_id";
constexpr char _SRC[] = "_src";
constexpr char _TYPE[] = "_type";
constexpr char _RANK[] = "_rank";
constexpr char _DST[] = "_dst";
// Useful type traits

// Tell if `T' is copy-constructible
Expand Down
4 changes: 4 additions & 0 deletions src/common/filter/Expressions.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/* Copyright (c) 2018 vesoft inc. All rights reserved.
*
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
Expand Down Expand Up @@ -111,6 +112,9 @@ std::string AliasPropertyExpression::toString() const {
std::string buf;
buf.reserve(64);
buf += *ref_;
if (*ref_ != "" && *ref_ != VAR_REF) {
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
buf += ".";
}
buf += *alias_;
if (*alias_ != "") {
buf += ".";
Expand Down
25 changes: 17 additions & 8 deletions src/common/filter/Expressions.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ enum class ColumnType {

std::string columnTypeToString(ColumnType type);

constexpr char INPUT_REF[] = "$-";
constexpr char VAR_REF[] = "$";
constexpr char SRC_REF[] = "$^";
constexpr char DST_REF[] = "$$";

class ExpressionContext final {
public:
using EdgeInfo = boost::variant<std::string, EdgeType>;
Expand Down Expand Up @@ -91,6 +96,10 @@ class ExpressionContext final {
return variables_;
}

std::vector<std::string> inputProps() const {
return std::vector<std::string>(inputProps_.begin(), inputProps_.end());
}

bool hasSrcTagProp() const {
return !srcTagProps_.empty();
}
Expand Down Expand Up @@ -437,7 +446,7 @@ class InputPropertyExpression final : public AliasPropertyExpression {

explicit InputPropertyExpression(std::string *prop) {
kind_ = kInputProp;
ref_.reset(new std::string("$-."));
ref_.reset(new std::string(INPUT_REF));
alias_.reset(new std::string(""));
prop_.reset(prop);
}
Expand All @@ -462,7 +471,7 @@ class DestPropertyExpression final : public AliasPropertyExpression {

DestPropertyExpression(std::string *tag, std::string *prop) {
kind_ = kDestProp;
ref_.reset(new std::string("$$."));
ref_.reset(new std::string(DST_REF));
alias_.reset(tag);
prop_.reset(prop);
}
Expand All @@ -487,7 +496,7 @@ class VariablePropertyExpression final : public AliasPropertyExpression {

VariablePropertyExpression(std::string *var, std::string *prop) {
kind_ = kVariableProp;
ref_.reset(new std::string("$"));
ref_.reset(new std::string(VAR_REF));
alias_.reset(var);
prop_.reset(prop);
}
Expand All @@ -514,7 +523,7 @@ class EdgeTypeExpression final : public AliasPropertyExpression {
kind_ = kEdgeType;
ref_.reset(new std::string(""));
alias_.reset(alias);
prop_.reset(new std::string("_type"));
prop_.reset(new std::string(_TYPE));
}

OptVariantType eval() const override;
Expand All @@ -539,7 +548,7 @@ class EdgeSrcIdExpression final : public AliasPropertyExpression {
kind_ = kEdgeSrcId;
ref_.reset(new std::string(""));
alias_.reset(alias);
prop_.reset(new std::string("_src"));
prop_.reset(new std::string(_SRC));
}

OptVariantType eval() const override;
Expand All @@ -564,7 +573,7 @@ class EdgeDstIdExpression final : public AliasPropertyExpression {
kind_ = kEdgeDstId;
ref_.reset(new std::string(""));
alias_.reset(alias);
prop_.reset(new std::string("_dst"));
prop_.reset(new std::string(_DST));
}

OptVariantType eval() const override;
Expand All @@ -589,7 +598,7 @@ class EdgeRankExpression final : public AliasPropertyExpression {
kind_ = kEdgeRank;
ref_.reset(new std::string(""));
alias_.reset(alias);
prop_.reset(new std::string("_rank"));
prop_.reset(new std::string(_RANK));
}

OptVariantType eval() const override;
Expand All @@ -612,7 +621,7 @@ class SourcePropertyExpression final : public AliasPropertyExpression {

SourcePropertyExpression(std::string *tag, std::string *prop) {
kind_ = kSourceProp;
ref_.reset(new std::string("$^."));
ref_.reset(new std::string(SRC_REF));
alias_.reset(tag);
prop_.reset(prop);
}
Expand Down
68 changes: 36 additions & 32 deletions src/graph/FetchEdgesExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ Status FetchEdgesExecutor::prepareClauses() {
if (!status.ok()) {
break;
}

expCtx_ = std::make_unique<ExpressionContext>();
expCtx_->setStorageClient(ectx()->getStorageClient());
spaceId_ = ectx()->rctx()->session()->space();
Expand All @@ -53,55 +54,53 @@ Status FetchEdgesExecutor::prepareClauses() {
if (!status.ok()) {
break;
}

// Save the type
auto iter = colTypes_.begin();
for (auto i = 0u; i < colNames_.size(); i++) {
auto type = labelSchema_->getFieldType(colNames_[i]);
if (type == CommonConstants::kInvalidValueType()) {
iter++;
continue;
}
*iter = type.type;
iter++;
}
} while (false);
return status;
}

Status FetchEdgesExecutor::prepareEdgeKeys() {
Status status = Status::OK();
if (sentence_->isRef()) {
auto *edgeKeyRef = sentence_->ref();
do {
if (sentence_->isRef()) {
auto *edgeKeyRef = sentence_->ref();

srcid_ = edgeKeyRef->srcid();
DCHECK_NOTNULL(srcid_);
srcid_ = edgeKeyRef->srcid();
DCHECK_NOTNULL(srcid_);
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved

dstid_ = edgeKeyRef->dstid();
DCHECK_NOTNULL(dstid_);
dstid_ = edgeKeyRef->dstid();
DCHECK_NOTNULL(dstid_);

rank_ = edgeKeyRef->rank();
auto ret = edgeKeyRef->varname();
if (!ret.ok()) {
status = std::move(ret).status();
rank_ = edgeKeyRef->rank();

if ((*srcid_ == "*")
|| (*dstid_ == "*")
|| (rank_ != nullptr && *rank_ == "*")) {
status = Status::Error("Can not use `*' to reference a vertex id column.");
break;
}

auto ret = edgeKeyRef->varname();
if (!ret.ok()) {
status = std::move(ret).status();
break;
}
varname_ = std::move(ret).value();
}
varname_ = ret.value();
}
} while (false);

return status;
}

void FetchEdgesExecutor::execute() {
DCHECK(onError_);
FLOG_INFO("Executing FetchEdges: %s", sentence_->toString().c_str());
auto status = prepareClauses();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
status = setupEdgeKeys();
if (!status.ok()) {
DCHECK(onError_);
onError_(std::move(status));
return;
}
Expand Down Expand Up @@ -326,21 +325,20 @@ void FetchEdgesExecutor::processResult(RpcResponse &&result) {
outputSchema = std::make_shared<SchemaWriter>();
auto status = getOutputSchema(eschema.get(), &*iter, outputSchema.get());
if (!status.ok()) {
LOG(ERROR) << "Get getOutputSchema failed" << status;
LOG(ERROR) << "Get getOutputSchema failed: " << status;
DCHECK(onError_);
onError_(Status::Error("Internal error."));
return;
}
rsWriter = std::make_unique<RowSetWriter>(outputSchema);
}
while (iter) {
auto collector = std::make_unique<Collector>(eschema.get());
auto writer = std::make_unique<RowWriter>(outputSchema);

auto &getters = expCtx_->getters();
getters.getAliasProp = [&](const std::string &,
const std::string &prop) -> OptVariantType {
return collector->getProp(prop, &*iter);
getters.getAliasProp = [&] (const std::string&,
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
const std::string &prop) -> OptVariantType {
return Collector::getProp(eschema.get(), prop, &*iter);
};
for (auto *column : yields_) {
auto *expr = column->expr();
Expand All @@ -349,7 +347,13 @@ void FetchEdgesExecutor::processResult(RpcResponse &&result) {
onError_(value.status());
return;
}
collector->collect(value.value(), writer.get());
auto status = Collector::collect(value.value(), writer.get());
if (!status.ok()) {
LOG(ERROR) << "Collect prop error: " << status;
DCHECK(onError_);
onError_(std::move(status));
return;
}
}

// TODO Consider float/double, and need to reduce mem copy.
Expand Down
55 changes: 13 additions & 42 deletions src/graph/FetchExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,29 @@ Status FetchExecutor::prepareYield() {
// such as YIELD 1+1, it has not type in schema, the type from the eval()
colTypes_.emplace_back(nebula::cpp2::SupportedType::UNKNOWN);
if (col->expr()->isAliasExpression()) {
colNames_.emplace_back(*dynamic_cast<AliasPropertyExpression*>(col->expr())->prop());
continue;
auto prop = *static_cast<AliasPropertyExpression*>(col->expr())->prop();
auto type = labelSchema_->getFieldType(prop);
if (type != CommonConstants::kInvalidValueType()) {
colTypes_.back() = type.get_type();
}
} else if (col->expr()->isTypeCastingExpression()) {
// type cast
auto exprPtr = dynamic_cast<TypeCastingExpression*>(col->expr());
colTypes_.back() = ColumnTypeToSupportedType(exprPtr->getType());
}

colNames_.emplace_back(col->expr()->toString());
}

if (expCtx_->hasSrcTagProp() || expCtx_->hasDstTagProp()) {
return Status::SyntaxError(
"tag.prop and edgetype.prop are supported in fetch sentence.");
}

if (expCtx_->hasInputProp() || expCtx_->hasVariableProp()) {
// TODO: support yield input and variable props
return Status::SyntaxError(
"`$-' and `$variable' not supported in fetch yet.");
}

auto aliasProps = expCtx_->aliasProps();
for (auto pair : aliasProps) {
if (pair.first != *labelName_) {
Expand Down Expand Up @@ -105,10 +112,9 @@ Status FetchExecutor::getOutputSchema(
if (expCtx_ == nullptr || resultColNames_.empty()) {
return Status::Error("Input is empty.");
}
auto collector = std::make_unique<Collector>(schema);
auto &getters = expCtx_->getters();
getters.getAliasProp = [&] (const std::string&, const std::string &prop) {
return collector->getProp(prop, reader);
return Collector::getProp(schema, prop, reader);
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
};
std::vector<VariantType> record;
for (auto *column : yields_) {
Expand All @@ -120,42 +126,7 @@ Status FetchExecutor::getOutputSchema(
record.emplace_back(std::move(value.value()));
}

if (colTypes_.size() != record.size()) {
return Status::Error("Input size is not equal to output");
}
using nebula::cpp2::SupportedType;
auto index = 0u;
for (auto &it : colTypes_) {
SupportedType type;
if (it == SupportedType::UNKNOWN) {
switch (record[index].which()) {
case VAR_INT64:
// all integers in InterimResult are regarded as type of INT
type = SupportedType::INT;
break;
case VAR_DOUBLE:
type = SupportedType::DOUBLE;
break;
case VAR_BOOL:
type = SupportedType::BOOL;
break;
case VAR_STR:
type = SupportedType::STRING;
break;
default:
std::string msg = folly::stringPrintf(
"Unknown VariantType: %d", record[index].which());
LOG(ERROR) << msg;
return Status::Error(msg);
}
} else {
type = it;
}

outputSchema->appendCol(resultColNames_[index], type);
index++;
}
return Status::OK();
return Collector::getSchema(record, resultColNames_, colTypes_, outputSchema);
}

void FetchExecutor::finishExecution(std::unique_ptr<RowSetWriter> rsWriter) {
Expand Down
2 changes: 0 additions & 2 deletions src/graph/FetchExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ class FetchExecutor : public TraverseExecutor {
std::unique_ptr<InterimResult> inputs_;
std::vector<std::string> resultColNames_;
std::unique_ptr<cpp2::ExecutionResponse> resp_;
using ColNameType = std::unordered_map<std::string, nebula::cpp2::SupportedType>;
std::vector<std::string> colNames_;
std::vector<nebula::cpp2::SupportedType> colTypes_;
};
} // namespace graph
Expand Down
Loading