Skip to content

Commit

Permalink
Implementation of set op. (vesoft-inc#696)
Browse files Browse the repository at this point in the history
* Implement union.

* Add distincter.

* Pipe have priority over set operator.

* Ajust error handling.

* Add distinct and implicit type casting for union.

* Inplement intersect and minus.

* Fix format.

* Adrress @monabobo's comment.

* Add result column names.

* Delete distincter and rebase.

* Rebase and fix conflict.

* Address @whitewum's comment.

* Address @laura-ding's comment and rebase.
  • Loading branch information
CPWstatic authored and dutor committed Aug 12, 2019
1 parent ebbb500 commit 23c975e
Show file tree
Hide file tree
Showing 22 changed files with 1,234 additions and 36 deletions.
3 changes: 3 additions & 0 deletions src/dataman/RowSetWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,8 @@ void RowSetWriter::addRow(const std::string& data) {
data_.append(data);
}

void RowSetWriter::addAll(const std::string& data) {
data_.append(data);
}
} // namespace nebula

2 changes: 2 additions & 0 deletions src/dataman/RowSetWriter.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class RowSetWriter {
void addRow(RowWriter& writer);
// Append the encoded row data
void addRow(const std::string& data);
// Copy existed rows
void addAll(const std::string& data);

private:
std::shared_ptr<const meta::SchemaProviderIf> schema_;
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ add_library(
FetchVerticesExecutor.cpp
FetchEdgesExecutor.cpp
FetchExecutor.cpp
SetExecutor.cpp
)
add_dependencies(
graph_obj
Expand Down
5 changes: 5 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "graph/ConfigExecutor.h"
#include "graph/FetchVerticesExecutor.h"
#include "graph/FetchEdgesExecutor.h"
#include "graph/ConfigExecutor.h"
#include "graph/SetExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -126,6 +128,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kFetchEdges:
executor = std::make_unique<FetchEdgesExecutor>(sentence, ectx());
break;
case Sentence::Kind::kSet:
executor = std::make_unique<SetExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
167 changes: 166 additions & 1 deletion src/graph/InterimResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
namespace nebula {
namespace graph {

constexpr char NotSupported[] = "Type not supported yet";

InterimResult::InterimResult(std::unique_ptr<RowSetWriter> rsWriter) {
rsWriter_ = std::move(rsWriter);
rsReader_ = std::make_unique<RowSetReader>(rsWriter_->schema(), rsWriter_->data());
Expand Down Expand Up @@ -119,7 +121,6 @@ std::vector<cpp2::RowValue> InterimResult::getRows() const {
return rows;
}


std::unique_ptr<InterimResult::InterimResultIndex>
InterimResult::buildIndex(const std::string &vidColumn) const {
using nebula::cpp2::SupportedType;
Expand Down Expand Up @@ -211,6 +212,170 @@ VariantType InterimResult::InterimResultIndex::getColumnWithVID(VertexID id,
return rows_[rowIndex][columnIndex];
}

Status InterimResult::castTo(cpp2::ColumnValue *col,
const nebula::cpp2::SupportedType &type) {
using nebula::cpp2::SupportedType;
switch (type) {
case SupportedType::VID:
return castToInt(col);
case SupportedType::DOUBLE:
return castToDouble(col);
case SupportedType::BOOL:
return castToBool(col);
case SupportedType::STRING:
return castToStr(col);
default:
// Notice: if we implement some other type,
// we should update here.
LOG(ERROR) << NotSupported << static_cast<int32_t>(type);
return Status::Error(NotSupported);
}
}

Status InterimResult::castToInt(cpp2::ColumnValue *col) {
switch (col->getType()) {
case cpp2::ColumnValue::Type::integer:
break;
case cpp2::ColumnValue::Type::double_precision: {
auto d2i = static_cast<int64_t>(col->get_double_precision());
col->set_integer(d2i);
break;
}
case cpp2::ColumnValue::Type::bool_val: {
auto b2i = static_cast<int64_t>(col->get_bool_val());
col->set_integer(b2i);
break;
}
case cpp2::ColumnValue::Type::str: {
auto r = folly::tryTo<int64_t>(col->get_str());
if (r.hasValue()) {
col->set_integer(r.value());
break;
} else {
return Status::Error(
"Casting from string %s to double failed.", col->get_str().c_str());
}
}
default:
LOG(ERROR) << NotSupported << static_cast<int32_t>(col->getType());
return Status::Error(NotSupported);
}
return Status::OK();
}

Status InterimResult::castToDouble(cpp2::ColumnValue *col) {
switch (col->getType()) {
case cpp2::ColumnValue::Type::integer: {
auto i2d = static_cast<double>(col->get_integer());
col->set_double_precision(i2d);
break;
}
case cpp2::ColumnValue::Type::double_precision:
break;
case cpp2::ColumnValue::Type::bool_val: {
auto b2d = static_cast<double>(col->get_bool_val());
col->set_double_precision(b2d);
break;
}
case cpp2::ColumnValue::Type::str: {
auto r = folly::tryTo<double>(col->get_str());
if (r.hasValue()) {
col->set_double_precision(r.value());
break;
} else {
return Status::Error(
"Casting from string %s to double failed.", col->get_str().c_str());
}
}
default:
LOG(ERROR) << NotSupported << static_cast<int32_t>(col->getType());
return Status::Error(NotSupported);
}
return Status::OK();
}

Status InterimResult::castToBool(cpp2::ColumnValue *col) {
switch (col->getType()) {
case cpp2::ColumnValue::Type::integer: {
auto i2b = col->get_integer() != 0;
col->set_bool_val(i2b);
break;
}
case cpp2::ColumnValue::Type::double_precision: {
auto d2b = col->get_double_precision() != 0.0;
col->set_bool_val(d2b);
break;
}
case cpp2::ColumnValue::Type::bool_val:
break;
case cpp2::ColumnValue::Type::str: {
auto s2b = col->get_str().empty();
col->set_bool_val(s2b);
break;
}
default:
LOG(ERROR) << NotSupported << static_cast<int32_t>(col->getType());
return Status::Error(NotSupported);
}
return Status::OK();
}

Status InterimResult::castToStr(cpp2::ColumnValue *col) {
switch (col->getType()) {
case cpp2::ColumnValue::Type::integer: {
auto i2s = folly::to<std::string>(col->get_integer());
col->set_str(std::move(i2s));
break;
}
case cpp2::ColumnValue::Type::double_precision: {
auto d2s = folly::to<std::string>(col->get_double_precision());
col->set_str(std::move(d2s));
break;
}
case cpp2::ColumnValue::Type::bool_val: {
auto b2s = folly::to<std::string>(col->get_bool_val());
col->set_str(std::move(b2s));
break;
}
case cpp2::ColumnValue::Type::str:
break;
default:
LOG(ERROR) << NotSupported << static_cast<int32_t>(col->getType());
return Status::Error(NotSupported);
}
return Status::OK();
}

std::unique_ptr<InterimResult> InterimResult::getInterim(
std::shared_ptr<const meta::SchemaProviderIf> resultSchema,
std::vector<cpp2::RowValue> &rows) {
auto rsWriter = std::make_unique<RowSetWriter>(resultSchema);
for (auto &r : rows) {
RowWriter writer(resultSchema);
auto &cols = r.get_columns();
for (auto &col : cols) {
switch (col.getType()) {
case cpp2::ColumnValue::Type::integer:
writer << col.get_integer();
break;
case cpp2::ColumnValue::Type::double_precision:
writer << col.get_double_precision();
break;
case cpp2::ColumnValue::Type::bool_val:
writer << col.get_bool_val();
break;
case cpp2::ColumnValue::Type::str:
writer << col.get_str();
break;
default:
LOG(ERROR) << NotSupported << static_cast<int32_t>(col.getType());
return nullptr;
}
}
rsWriter->addRow(writer);
}

return std::make_unique<InterimResult>(std::move(rsWriter));
}
} // namespace graph
} // namespace nebula
21 changes: 16 additions & 5 deletions src/graph/InterimResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
#include "dataman/RowSetWriter.h"
#include "dataman/SchemaWriter.h"

namespace nebula {
namespace graph {
/**
* The intermediate form of execution result, used in pipeline and variable.
*/


namespace nebula {
namespace graph {

class InterimResult final {
public:
InterimResult() = default;
Expand All @@ -34,10 +31,24 @@ class InterimResult final {
explicit InterimResult(std::unique_ptr<RowSetWriter> rsWriter);
explicit InterimResult(std::vector<VertexID> vids);

static std::unique_ptr<InterimResult> getInterim(
std::shared_ptr<const meta::SchemaProviderIf> resultSchema,
std::vector<cpp2::RowValue> &rows);
static Status castTo(cpp2::ColumnValue *col,
const nebula::cpp2::SupportedType &type);
static Status castToInt(cpp2::ColumnValue *col);
static Status castToDouble(cpp2::ColumnValue *col);
static Status castToBool(cpp2::ColumnValue *col);
static Status castToStr(cpp2::ColumnValue *col);

std::shared_ptr<const meta::SchemaProviderIf> schema() const {
return rsReader_->schema();
}

std::string& data() const {
return rsWriter_->data();
}

StatusOr<std::vector<VertexID>> getVIDs(const std::string &col) const;

StatusOr<std::vector<VertexID>> getDistinctVIDs(const std::string &col) const;
Expand Down
17 changes: 16 additions & 1 deletion src/graph/PipeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ PipeExecutor::PipeExecutor(Sentence *sentence,


Status PipeExecutor::prepare() {
auto status = syntaxPreCheck();
if (!status.ok()) {
return status;
}

left_ = makeTraverseExecutor(sentence_->left());
right_ = makeTraverseExecutor(sentence_->right());
DCHECK(left_ != nullptr);
Expand Down Expand Up @@ -69,7 +74,7 @@ Status PipeExecutor::prepare() {
right_->setOnError(onError);
}

auto status = left_->prepare();
status = left_->prepare();
if (!status.ok()) {
FLOG_ERROR("Prepare executor `%s' failed: %s",
left_->name(), status.toString().c_str());
Expand All @@ -87,6 +92,16 @@ Status PipeExecutor::prepare() {
return Status::OK();
}

Status PipeExecutor::syntaxPreCheck() {
// Set op not support input,
// because '$-' would be ambiguous in such a situation:
// Go | (Go | Go $- UNION GO)
if (sentence_->right()->kind() == Sentence::Kind::kSet) {
return Status::SyntaxError("Set op not support input.");
}

return Status::OK();
}

void PipeExecutor::execute() {
left_->execute();
Expand Down
3 changes: 3 additions & 0 deletions src/graph/PipeExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ class PipeExecutor final : public TraverseExecutor {

void setupResponse(cpp2::ExecutionResponse &resp) override;

private:
Status syntaxPreCheck();

private:
PipedSentence *sentence_{nullptr};
std::unique_ptr<TraverseExecutor> left_;
Expand Down
Loading

0 comments on commit 23c975e

Please sign in to comment.