Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementation of Order by #537

Merged
merged 10 commits into from
Jul 1, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/dataman/RowSetReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@ class RowSetReader {

virtual ~RowSetReader() = default;

meta::SchemaProviderIf const* schema() const {
return schema_.get();
std::shared_ptr<const meta::SchemaProviderIf> schema() const {
return schema_;
}

Iterator begin() const noexcept;
Expand Down
1 change: 1 addition & 0 deletions src/graph/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ add_library(
DescribeSpaceExecutor.cpp
ShowExecutor.cpp
YieldExecutor.cpp
OrderByExecutor.cpp
SchemaHelper.cpp
)
add_dependencies(
Expand Down
4 changes: 4 additions & 0 deletions src/graph/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "graph/DescribeSpaceExecutor.h"
#include "graph/DropSpaceExecutor.h"
#include "graph/YieldExecutor.h"
#include "graph/OrderByExecutor.h"

namespace nebula {
namespace graph {
Expand Down Expand Up @@ -102,6 +103,9 @@ std::unique_ptr<Executor> Executor::makeExecutor(Sentence *sentence) {
case Sentence::Kind::kYield:
executor = std::make_unique<YieldExecutor>(sentence, ectx());
break;
case Sentence::Kind::kOrderBy:
executor = std::make_unique<OrderByExecutor>(sentence, ectx());
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
55 changes: 55 additions & 0 deletions src/graph/InterimResult.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,61 @@ std::vector<VertexID> InterimResult::getVIDs(const std::string &col) const {
return result;
}

std::vector<cpp2::RowValue> InterimResult::getRows() const {
DCHECK(rsReader_ != nullptr);
auto schema = rsReader_->schema();
auto columnCnt = schema->getNumFields();
std::vector<cpp2::RowValue> rows;
folly::StringPiece piece;
using nebula::cpp2::SupportedType;
auto rowIter = rsReader_->begin();
while (rowIter) {
std::vector<cpp2::ColumnValue> row;
row.reserve(columnCnt);
auto fieldIter = schema->begin();
while (fieldIter) {
auto type = fieldIter->getType().type;
auto field = fieldIter->getName();
row.emplace_back();
switch (type) {
case SupportedType::VID: {
int64_t v;
auto rc = rowIter->getVid(field, v);
CHECK(rc == ResultType::SUCCEEDED);
row.back().set_integer(v);
break;
}
case SupportedType::DOUBLE: {
double v;
auto rc = rowIter->getDouble(field, v);
CHECK(rc == ResultType::SUCCEEDED);
row.back().set_double_precision(v);
break;
}
case SupportedType::BOOL: {
bool v;
auto rc = rowIter->getBool(field, v);
CHECK(rc == ResultType::SUCCEEDED);
row.back().set_bool_val(v);
break;
}
case SupportedType::STRING: {
auto rc = rowIter->getString(field, piece);
CHECK(rc == ResultType::SUCCEEDED);
row.back().set_str(piece.toString());
break;
}
default:
LOG(FATAL) << "Unknown Type: " << static_cast<int32_t>(type);
}
++fieldIter;
}
rows.emplace_back();
rows.back().set_columns(std::move(row));
++rowIter;
}
return rows;
}

} // namespace graph
} // namespace nebula
5 changes: 5 additions & 0 deletions src/graph/InterimResult.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,13 @@ class InterimResult final {
explicit InterimResult(std::unique_ptr<RowSetWriter> rsWriter);
explicit InterimResult(std::vector<VertexID> vids);

std::shared_ptr<const meta::SchemaProviderIf> schema() const {
return rsReader_->schema();
}

std::vector<VertexID> getVIDs(const std::string &col) const;

std::vector<cpp2::RowValue> getRows() const;
// TODO(dutor) iterating interfaces on rows and columns

private:
Expand Down
191 changes: 191 additions & 0 deletions src/graph/OrderByExecutor.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,191 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include "graph/OrderByExecutor.h"

namespace nebula {
namespace graph {
namespace cpp2 {

bool ColumnValue::operator < (const ColumnValue& rhs) const {
DCHECK_EQ(type_, rhs.type_);
auto& lhs = *this;
switch (lhs.type_) {
case Type::bool_val:
{
return lhs.value_.bool_val < rhs.value_.bool_val;
}
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
case Type::integer:
{
return lhs.value_.integer < rhs.value_.integer;
}
case Type::id:
{
return lhs.value_.id < rhs.value_.id;
}
case Type::single_precision:
{
return lhs.value_.single_precision < rhs.value_.single_precision;
}
case Type::double_precision:
{
return lhs.value_.double_precision < rhs.value_.double_precision;
}
case Type::str:
{
return lhs.value_.str < rhs.value_.str;
}
case Type::timestamp:
{
return lhs.value_.timestamp < rhs.value_.timestamp;
}
case Type::year:
{
return lhs.value_.year < rhs.value_.year;
}
case Type::month:
{
return lhs.value_.month < rhs.value_.month;
}
case Type::date:
{
return lhs.value_.date < rhs.value_.date;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To date, datetime, are you comparing by string or by corresponding value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by value, how do u think? it depends on the implementation of storage actually. Date and Datetime is not implement in storage. For now, i am doing the sort with the union type of ColumnValue, because it is comparable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although date and datetime is not implemented, what I really care about is how to store these two types, whether they are stored as strings or converted to timestamps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do u mean that how to store in storage?

case Type::datetime:
{
return lhs.value_.datetime < rhs.value_.datetime;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

default:
{
return false;
}
}
return false;
}
} // namespace cpp2

OrderByExecutor::OrderByExecutor(Sentence *sentence, ExecutionContext *ectx)
: TraverseExecutor(ectx) {
sentence_ = static_cast<OrderBySentence*>(sentence);
}

Status OrderByExecutor::prepare() {
return Status::OK();
}

void OrderByExecutor::feedResult(std::unique_ptr<InterimResult> result) {
if (result == nullptr) {
return;
}
DCHECK(sentence_ != nullptr);
inputs_ = std::move(result);
rows_ = inputs_->getRows();

auto schema = inputs_->schema();
auto factors = sentence_->factors();
sortFactors_.reserve(factors.size());
for (auto &factor : factors) {
auto expr = static_cast<InputPropertyExpression*>(factor->expr());
folly::StringPiece field = *(expr->prop());
auto fieldIndex = schema->getFieldIndex(field);
if (fieldIndex == -1) {
LOG(INFO) << "Field(" << field << ") not exist in input schema.";
continue;
}
auto pair = std::make_pair(schema->getFieldIndex(field), factor->orderType());
sortFactors_.emplace_back(std::move(pair));
}
}

void OrderByExecutor::execute() {
FLOG_INFO("Executing Order By: %s", sentence_->toString().c_str());
auto comparator = [this] (cpp2::RowValue& lhs, cpp2::RowValue& rhs) {
const auto &lhsColumns = lhs.get_columns();
const auto &rhsColumns = rhs.get_columns();
for (auto &factor : this->sortFactors_) {
auto fieldIndex = factor.first;
auto orderType = factor.second;
if (lhsColumns[fieldIndex] == rhsColumns[fieldIndex]) {
continue;
}

if (orderType == OrderFactor::OrderType::ASCEND) {
return lhsColumns[fieldIndex] < rhsColumns[fieldIndex];
} else if (orderType == OrderFactor::OrderType::DESCEND) {
return lhsColumns[fieldIndex] > rhsColumns[fieldIndex];
} else {
LOG(FATAL) << "Unkown Order Type: " << orderType;
}
}
return false;
};

if (!sortFactors_.empty()) {
std::sort(rows_.begin(), rows_.end(), comparator);
}

if (onResult_) {
onResult_(setupInterimResult());
}
DCHECK(onFinish_);
onFinish_();
}

std::unique_ptr<InterimResult> OrderByExecutor::setupInterimResult() {
if (rows_.empty()) {
return nullptr;
}

auto schema = inputs_->schema();
auto rsWriter = std::make_unique<RowSetWriter>(schema);
using Type = cpp2::ColumnValue::Type;
for (auto &row : rows_) {
RowWriter writer(schema);
auto columns = row.get_columns();
for (auto &column : columns) {
switch (column.getType()) {
case Type::integer:
writer << column.get_integer();
break;
case Type::double_precision:
writer << column.get_double_precision();
break;
case Type::bool_val:
writer << column.get_bool_val();
break;
case Type::str:
writer << column.get_str();
break;
default:
LOG(FATAL) << "Not Support: " << column.getType();
}
}
rsWriter->addRow(writer);
}

return std::make_unique<InterimResult>(std::move(rsWriter));
}

void OrderByExecutor::setupResponse(cpp2::ExecutionResponse &resp) {
if (rows_.empty()) {
return;
}

auto schema = inputs_->schema();
std::vector<std::string> columnNames;
columnNames.reserve(schema->getNumFields());
auto field = schema->begin();
while (field) {
columnNames.emplace_back(field->getName());
++field;
}
resp.set_column_names(std::move(columnNames));
resp.set_rows(std::move(rows_));
}

} // namespace graph
} // namespace nebula
43 changes: 43 additions & 0 deletions src/graph/OrderByExecutor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#ifndef GRAPH_ORDERBYEXECUTOR_H_
#define GRAPH_ORDERBYEXECUTOR_H_

#include "base/Base.h"
#include "graph/TraverseExecutor.h"

namespace nebula {
namespace graph {

class OrderByExecutor final : public TraverseExecutor {
public:
OrderByExecutor(Sentence *sentence, ExecutionContext *ectx);

const char* name() const override {
return "OrderByExecutor";
}

Status MUST_USE_RESULT prepare() override;

void execute() override;

void feedResult(std::unique_ptr<InterimResult> result) override;

void setupResponse(cpp2::ExecutionResponse &resp) override;

private:
std::unique_ptr<InterimResult> setupInterimResult();

private:
OrderBySentence *sentence_{nullptr};
std::unique_ptr<InterimResult> inputs_;
std::vector<cpp2::RowValue> rows_;
std::vector<std::pair<int64_t, OrderFactor::OrderType>> sortFactors_;
CPWstatic marked this conversation as resolved.
Show resolved Hide resolved
};
} // namespace graph
} // namespace nebula
#endif // GRAPH_ORDERBYEXECUTOR_H_
4 changes: 4 additions & 0 deletions src/graph/TraverseExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include "parser/TraverseSentences.h"
#include "graph/GoExecutor.h"
#include "graph/PipeExecutor.h"
#include "graph/OrderByExecutor.h"

namespace nebula {
namespace graph {
Expand All @@ -30,6 +31,9 @@ TraverseExecutor::makeTraverseExecutor(Sentence *sentence, ExecutionContext *ect
case Sentence::Kind::kPipe:
executor = std::make_unique<PipeExecutor>(sentence, ectx);
break;
case Sentence::Kind::kOrderBy:
executor = std::make_unique<OrderByExecutor>(sentence, ectx);
break;
case Sentence::Kind::kUnknown:
LOG(FATAL) << "Sentence kind unknown";
break;
Expand Down
20 changes: 20 additions & 0 deletions src/graph/test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,23 @@ nebula_link_libraries(
gtest
)
nebula_add_test(data_test)

add_executable(
order_by_test
TestMain.cpp
TestEnv.cpp
TestBase.cpp
OrderByTest.cpp
$<TARGET_OBJECTS:client_cpp_obj>
$<TARGET_OBJECTS:adHocSchema_obj>
${GRAPH_TEST_LIBS}
)

nebula_link_libraries(
order_by_test
${THRIFT_LIBRARIES}
${ROCKSDB_LIBRARIES}
wangle
gtest
)
nebula_add_test(order_by_test)
Loading