diff --git a/src/codec/NebulaCodecImpl.cpp b/src/codec/NebulaCodecImpl.cpp index 8e894071a4a..293140f8ad5 100644 --- a/src/codec/NebulaCodecImpl.cpp +++ b/src/codec/NebulaCodecImpl.cpp @@ -23,7 +23,7 @@ namespace nebula { std::string NebulaCodecImpl::encode(std::vector values, - std::shared_ptr schema) { + std::shared_ptr schema) { RowWriter writer(schema); for (auto& value : values) { if (value.type() == typeid(int32_t)) { @@ -47,7 +47,7 @@ std::string NebulaCodecImpl::encode(std::vector values, } StatusOr> NebulaCodecImpl::decode( - std::string encoded, std::shared_ptr schema) { + std::string encoded, std::shared_ptr schema) { if (encoded.empty()) { return Status::Error("encoded string is empty"); } diff --git a/src/codec/RowReaderV2.cpp b/src/codec/RowReaderV2.cpp index bbb52de58c2..9c9ef343edc 100644 --- a/src/codec/RowReaderV2.cpp +++ b/src/codec/RowReaderV2.cpp @@ -9,7 +9,7 @@ namespace nebula { using nebula::cpp2::PropertyType; -bool RowReaderV2::resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row) { +bool RowReaderV2::resetImpl(meta::NebulaSchemaProvider const* schema, folly::StringPiece row) { schema_ = schema; data_ = row; diff --git a/src/codec/RowReaderV2.h b/src/codec/RowReaderV2.h index 7ceeba30b69..d3ce9ec4897 100644 --- a/src/codec/RowReaderV2.h +++ b/src/codec/RowReaderV2.h @@ -9,7 +9,7 @@ #include #include "common/base/Base.h" -#include "common/meta/SchemaProviderIf.h" +#include "common/meta/NebulaSchemaProvider.h" namespace nebula { @@ -34,7 +34,7 @@ class RowReaderV2 { return headerLen_; } - const meta::SchemaProviderIf* getSchema() const { + const meta::NebulaSchemaProvider* getSchema() const { return schema_; } @@ -51,10 +51,10 @@ class RowReaderV2 { } private: - bool resetImpl(meta::SchemaProviderIf const* schema, folly::StringPiece row); + bool resetImpl(meta::NebulaSchemaProvider const* schema, folly::StringPiece row); private: - meta::SchemaProviderIf const* schema_; + meta::NebulaSchemaProvider const* schema_; folly::StringPiece data_; size_t headerLen_; size_t numNullBytes_; diff --git a/src/codec/RowReaderWrapper.cpp b/src/codec/RowReaderWrapper.cpp index 7fdd10e6103..431145dfbac 100644 --- a/src/codec/RowReaderWrapper.cpp +++ b/src/codec/RowReaderWrapper.cpp @@ -52,7 +52,7 @@ RowReaderWrapper RowReaderWrapper::getEdgePropReader(meta::SchemaManager* schema } // static -RowReaderWrapper RowReaderWrapper::getRowReader(const meta::SchemaProviderIf* schema, +RowReaderWrapper RowReaderWrapper::getRowReader(const meta::NebulaSchemaProvider* schema, folly::StringPiece row) { SchemaVer schemaVer; int32_t readerVer; @@ -77,7 +77,7 @@ RowReaderWrapper RowReaderWrapper::getRowReader( return RowReaderWrapper(schemas[schemaVer].get(), row, readerVer); } -RowReaderWrapper::RowReaderWrapper(const meta::SchemaProviderIf* schema, +RowReaderWrapper::RowReaderWrapper(const meta::NebulaSchemaProvider* schema, const folly::StringPiece& row, int32_t& readerVer) { CHECK_EQ(readerVer, 2); @@ -86,7 +86,7 @@ RowReaderWrapper::RowReaderWrapper(const meta::SchemaProviderIf* schema, currReader_ = &readerV2_; } -bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, +bool RowReaderWrapper::reset(meta::NebulaSchemaProvider const* schema, folly::StringPiece row, int32_t readerVer) { CHECK_EQ(readerVer, 2); @@ -96,7 +96,7 @@ bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, return true; } -bool RowReaderWrapper::reset(meta::SchemaProviderIf const* schema, folly::StringPiece row) { +bool RowReaderWrapper::reset(meta::NebulaSchemaProvider const* schema, folly::StringPiece row) { currReader_ = nullptr; if (schema == nullptr) { return false; diff --git a/src/codec/RowReaderWrapper.h b/src/codec/RowReaderWrapper.h index af8425aa21f..d914cbff625 100644 --- a/src/codec/RowReaderWrapper.h +++ b/src/codec/RowReaderWrapper.h @@ -11,8 +11,8 @@ #include "codec/RowReaderV2.h" #include "common/base/Base.h" #include "common/datatypes/Value.h" +#include "common/meta/NebulaSchemaProvider.h" #include "common/meta/SchemaManager.h" -#include "common/meta/SchemaProviderIf.h" namespace nebula { @@ -86,7 +86,7 @@ class RowReaderWrapper { * @param row * @return RowReaderWrapper */ - static RowReaderWrapper getRowReader(meta::SchemaProviderIf const* schema, + static RowReaderWrapper getRowReader(meta::NebulaSchemaProvider const* schema, folly::StringPiece row); /** @@ -109,7 +109,7 @@ class RowReaderWrapper { * @param row * @param readerVer Row reader version */ - RowReaderWrapper(const meta::SchemaProviderIf* schema, + RowReaderWrapper(const meta::NebulaSchemaProvider* schema, const folly::StringPiece& row, int32_t& readerVer); @@ -121,7 +121,7 @@ class RowReaderWrapper { * @param readVer * @return Whether reset succeed */ - bool reset(meta::SchemaProviderIf const* schema, folly::StringPiece row, int32_t readVer); + bool reset(meta::NebulaSchemaProvider const* schema, folly::StringPiece row, int32_t readVer); /** * @brief Reset current row reader wrapper to of given schema and data @@ -130,7 +130,7 @@ class RowReaderWrapper { * @param row * @return Whether reset succeed */ - bool reset(meta::SchemaProviderIf const* schema, folly::StringPiece row); + bool reset(meta::NebulaSchemaProvider const* schema, folly::StringPiece row); /** * @brief Reset current row reader wrapper of given schemas and data, the schemas are stored in @@ -174,7 +174,7 @@ class RowReaderWrapper { return currReader_->numFields(); } - const meta::SchemaProviderIf* getSchema() const { + const meta::NebulaSchemaProvider* getSchema() const { DCHECK(!!currReader_); return currReader_->getSchema(); } diff --git a/src/codec/RowWriterV2.cpp b/src/codec/RowWriterV2.cpp index 5337254a421..d25506d84bf 100644 --- a/src/codec/RowWriterV2.cpp +++ b/src/codec/RowWriterV2.cpp @@ -16,7 +16,7 @@ namespace nebula { using nebula::cpp2::PropertyType; -RowWriterV2::RowWriterV2(const meta::SchemaProviderIf* schema) +RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema) : schema_(schema), numNullBytes_(0), approxStrLen_(0), finished_(false), outOfSpaceStr_(false) { CHECK(!!schema_); @@ -88,14 +88,14 @@ RowWriterV2::RowWriterV2(const meta::SchemaProviderIf* schema) isSet_.resize(schema_->getNumFields(), false); } -RowWriterV2::RowWriterV2(const meta::SchemaProviderIf* schema, std::string&& encoded) +RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema, std::string&& encoded) : schema_(schema), finished_(false), outOfSpaceStr_(false) { auto len = encoded.size(); buf_ = std::move(encoded).substr(0, len - sizeof(int64_t)); processV2EncodedStr(); } -RowWriterV2::RowWriterV2(const meta::SchemaProviderIf* schema, const std::string& encoded) +RowWriterV2::RowWriterV2(const meta::NebulaSchemaProvider* schema, const std::string& encoded) : schema_(schema), buf_(encoded.substr(0, encoded.size() - sizeof(int64_t))), finished_(false), diff --git a/src/codec/RowWriterV2.h b/src/codec/RowWriterV2.h index 02d711babee..2d46d6b505a 100644 --- a/src/codec/RowWriterV2.h +++ b/src/codec/RowWriterV2.h @@ -8,7 +8,7 @@ #include "codec/RowReaderWrapper.h" #include "common/base/Base.h" -#include "common/meta/SchemaProviderIf.h" +#include "common/meta/NebulaSchemaProvider.h" namespace nebula { @@ -85,11 +85,11 @@ enum class WriteResult { ********************************************************************************/ class RowWriterV2 { public: - explicit RowWriterV2(const meta::SchemaProviderIf* schema); + explicit RowWriterV2(const meta::NebulaSchemaProvider* schema); // This constructor only takes a V2 encoded string - RowWriterV2(const meta::SchemaProviderIf* schema, std::string&& encoded); + RowWriterV2(const meta::NebulaSchemaProvider* schema, std::string&& encoded); // This constructor only takes a V2 encoded string - RowWriterV2(const meta::SchemaProviderIf* schema, const std::string& encoded); + RowWriterV2(const meta::NebulaSchemaProvider* schema, const std::string& encoded); // This constructor can handle constructed from RowReaderWrapper, which is V2 reader explicit RowWriterV2(RowReaderWrapper& reader); @@ -107,9 +107,9 @@ class RowWriterV2 { /** * @brief Return the related schema * - * @return const meta::SchemaProviderIf* + * @return const meta::NebulaSchemaProvider* */ - const meta::SchemaProviderIf* schema() const { + const meta::NebulaSchemaProvider* schema() const { return schema_; } @@ -213,7 +213,7 @@ class RowWriterV2 { WriteResult setNull(const std::string& name); private: - const meta::SchemaProviderIf* schema_; + const meta::NebulaSchemaProvider* schema_; std::string buf_; std::vector isSet_; // The number of bytes occupied by header and the schema version diff --git a/src/codec/include/NebulaCodec.h b/src/codec/include/NebulaCodec.h index 3e289e89462..5316be35898 100644 --- a/src/codec/include/NebulaCodec.h +++ b/src/codec/include/NebulaCodec.h @@ -7,7 +7,7 @@ #define CODEC_INCLUDE_NEBULACODEC_H #include "common/base/StatusOr.h" -#include "common/meta/SchemaProviderIf.h" +#include "common/meta/NebulaSchemaProvider.h" namespace nebula { @@ -17,11 +17,12 @@ class NebulaCodec { virtual ~NebulaCodec() = default; - virtual std::string encode(std::vector values, - std::shared_ptr schema = nullptr) = 0; + virtual std::string encode( + std::vector values, + std::shared_ptr schema = nullptr) = 0; virtual StatusOr> decode( - std::string encoded, std::shared_ptr schema) = 0; + std::string encoded, std::shared_ptr schema) = 0; }; } // namespace nebula diff --git a/src/codec/test/CMakeLists.txt b/src/codec/test/CMakeLists.txt index cb14a698fb4..42349626752 100644 --- a/src/codec/test/CMakeLists.txt +++ b/src/codec/test/CMakeLists.txt @@ -2,15 +2,8 @@ # # This source code is licensed under Apache 2.0 License. -nebula_add_library( - codec_test_obj OBJECT - ResultSchemaProvider.cpp - SchemaWriter.cpp -) - set(CODEC_TEST_LIBS $ - $ $ $ $ @@ -83,10 +76,3 @@ nebula_add_executable( wangle boost_regex ) - -#nebula_add_test( -# NAME nebula_codec_test -# SOURCES NebulaCodecTest.cpp -# OBJECTS ${CODEC_TEST_LIBS} -# LIBRARIES ${THRIFT_LIBRARIES} wangle gtest -#) diff --git a/src/codec/test/NebulaCodecTest.cpp b/src/codec/test/NebulaCodecTest.cpp deleted file mode 100644 index 806342d9944..00000000000 --- a/src/codec/test/NebulaCodecTest.cpp +++ /dev/null @@ -1,163 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "codec/NebulaCodecImpl.h" -#include "codec/RowReaderWrapper.h" -#include "codec/test/SchemaWriter.h" -#include "common/base/Base.h" - -namespace nebula { - -TEST(NebulaCodec, encode) { - std::vector v; - v.emplace_back(1); - v.emplace_back(false); - v.emplace_back(3.14F); - v.emplace_back(3.14); - v.emplace_back(std::string("hi")); - - EXPECT_EQ(std::any_cast(v[0]), 1); - EXPECT_EQ(std::any_cast(v[1]), false); - EXPECT_EQ(std::any_cast(v[2]), 3.14F); - EXPECT_EQ(std::any_cast(v[3]), 3.14); - EXPECT_EQ(std::any_cast(v[4]), "hi"); - - SchemaWriter schemaWriter; - schemaWriter.appendCol("i_field", cpp2::SupportedType::INT); - schemaWriter.appendCol("b_field", cpp2::SupportedType::BOOL); - schemaWriter.appendCol("f_field", cpp2::SupportedType::FLOAT); - schemaWriter.appendCol("d_field", cpp2::SupportedType::DOUBLE); - schemaWriter.appendCol("s_field", cpp2::SupportedType::STRING); - - auto schema = std::make_shared(schemaWriter.moveSchema()); - NebulaCodecImpl codec; - std::string encoded = codec.encode(v, schema); - - auto reader = RowReaderWrapper::getRowReader(encoded, schema); - EXPECT_EQ(5, reader->numFields()); - - // check int field - int32_t iVal; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getInt(0, iVal)); - EXPECT_EQ(1, iVal); - iVal = 0; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getInt("i_field", iVal)); - EXPECT_EQ(1, iVal); - - // check bool field - bool bVal; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getBool(1, bVal)); - EXPECT_FALSE(bVal); - bVal = true; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getBool("b_field", bVal)); - EXPECT_FALSE(bVal); - - // check float field - float fVal; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getFloat(2, fVal)); - EXPECT_FLOAT_EQ(3.14, fVal); - fVal = 0.0; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getFloat("f_field", fVal)); - EXPECT_FLOAT_EQ(3.14, fVal); - - // check double field - double dVal; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getDouble(3, dVal)); - EXPECT_DOUBLE_EQ(3.14, dVal); - dVal = 0.0; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getDouble("d_field", dVal)); - EXPECT_DOUBLE_EQ(3.14, dVal); - - // check string field - folly::StringPiece sVal; - EXPECT_EQ(ResultType::SUCCEEDED, reader->getString(4, sVal)); - EXPECT_EQ("hi", sVal.toString()); - sVal.clear(); - EXPECT_EQ(ResultType::SUCCEEDED, reader->getString("s_field", sVal)); - EXPECT_EQ("hi", sVal.toString()); - - // check empty values - std::vector emptyV; - std::string emptyEncoded = codec.encode(emptyV); - - SchemaWriter emptyWriter; - auto emptySchema = std::make_shared(emptyWriter.moveSchema()); - auto emptyReader = RowReaderWrapper::getRowReader(emptyEncoded, emptySchema); - EXPECT_EQ(0, emptyReader->numFields()); -} - -TEST(NebulaCodec, decode) { - std::string encoded; - // Single byte header (Schema version is 0, no offset) - encoded.append(1, 0x00); - - // bool column - encoded.append(1, 0x01); - - // int column - uint8_t buffer[10]; - size_t i_size = folly::encodeVarint(64, buffer); - encoded.append(reinterpret_cast(buffer), i_size); - - // vid column - int64_t vid = 0x1122334455667788L; - encoded.append(reinterpret_cast(&vid), sizeof(int64_t)); - - // float column - float pi = 3.14F; - encoded.append(reinterpret_cast(&pi), sizeof(float)); - - // double column - double e = 2.718; - encoded.append(reinterpret_cast(&e), sizeof(double)); - - // string column - const char* str_value = "Hello World!"; - size_t s_size = folly::encodeVarint(strlen(str_value), buffer); - encoded.append(reinterpret_cast(buffer), s_size); - encoded.append(str_value, strlen(str_value)); - - SchemaWriter schemaWriter; - schemaWriter.appendCol("b_field", cpp2::SupportedType::BOOL); - schemaWriter.appendCol("i_field", cpp2::SupportedType::INT); - schemaWriter.appendCol("v_field", cpp2::SupportedType::VID); - schemaWriter.appendCol("f_field", cpp2::SupportedType::FLOAT); - schemaWriter.appendCol("d_field", cpp2::SupportedType::DOUBLE); - schemaWriter.appendCol("s_field", cpp2::SupportedType::STRING); - auto schema = std::make_shared(schemaWriter.moveSchema()); - - NebulaCodecImpl codec; - auto result = codec.decode(encoded, schema); - - EXPECT_TRUE(std::any_cast(result.value()["b_field"])); - EXPECT_EQ(std::any_cast(result.value()["i_field"]), 64); - EXPECT_EQ(std::any_cast(result.value()["v_field"]), 0x1122334455667788L); - EXPECT_EQ(std::any_cast(result.value()["f_field"]), 3.14F); - EXPECT_EQ(std::any_cast(result.value()["d_field"]), 2.718); - EXPECT_EQ(std::any_cast(result.value()["s_field"]), "Hello World!"); - - // check empty encoded string - auto empty_encoded = codec.decode("", schema); - ASSERT_FALSE(empty_encoded.ok()); - ASSERT_FALSE(empty_encoded.status().ok()); - ASSERT_EQ("encoded string is empty", empty_encoded.status().toString()); - - // check empty schema - auto empty_schema = codec.decode(encoded, nullptr); - ASSERT_FALSE(empty_schema.ok()); - ASSERT_FALSE(empty_schema.status().ok()); - ASSERT_EQ("schema is not set", empty_schema.status().toString()); -} -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - return RUN_ALL_TESTS(); -} diff --git a/src/codec/test/ResultSchemaProvider.cpp b/src/codec/test/ResultSchemaProvider.cpp deleted file mode 100644 index 20acd6dd071..00000000000 --- a/src/codec/test/ResultSchemaProvider.cpp +++ /dev/null @@ -1,141 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "codec/test/ResultSchemaProvider.h" - -namespace nebula { - -using folly::hash::SpookyHashV2; -using meta::cpp2::Schema; -using nebula::cpp2::PropertyType; - -/*********************************** - * - * ResultSchemaField - * - **********************************/ -ResultSchemaProvider::ResultSchemaField::ResultSchemaField(std::string name, - PropertyType type, - int16_t size, - bool nullable, - int32_t offset, - size_t nullFlagPos, - std::string defaultValue, - meta::cpp2::GeoShape geoShape) - : name_(std::move(name)), - type_(type), - size_(size), - nullable_(nullable), - offset_(offset), - nullFlagPos_(nullFlagPos), - defaultValue_(defaultValue), - geoShape_(geoShape) {} - -const char* ResultSchemaProvider::ResultSchemaField::name() const { - return name_.c_str(); -} - -PropertyType ResultSchemaProvider::ResultSchemaField::type() const { - return type_; -} - -bool ResultSchemaProvider::ResultSchemaField::hasDefault() const { - return defaultValue_ != ""; -} - -bool ResultSchemaProvider::ResultSchemaField::nullable() const { - return nullable_; -} - -const std::string& ResultSchemaProvider::ResultSchemaField::defaultValue() const { - return defaultValue_; -} - -size_t ResultSchemaProvider::ResultSchemaField::size() const { - return size_; -} - -size_t ResultSchemaProvider::ResultSchemaField::offset() const { - return offset_; -} - -size_t ResultSchemaProvider::ResultSchemaField::nullFlagPos() const { - return nullFlagPos_; -} - -meta::cpp2::GeoShape ResultSchemaProvider::ResultSchemaField::geoShape() const { - return geoShape_; -} - -/*********************************** - * - * ResultSchemaProvider - * - **********************************/ -size_t ResultSchemaProvider::getNumFields() const noexcept { - return columns_.size(); -} - -size_t ResultSchemaProvider::getNumNullableFields() const noexcept { - return numNullableFields_; -} - -size_t ResultSchemaProvider::size() const noexcept { - if (columns_.size() > 0) { - auto& last = columns_.back(); - return last.offset() + last.size(); - } else { - return 0; - } -} - -int64_t ResultSchemaProvider::getFieldIndex(const std::string& name) const { - uint64_t hash = SpookyHashV2::Hash64(name.data(), name.size(), 0); - auto iter = nameIndex_.find(hash); - if (iter == nameIndex_.end()) { - return -1; - } - return iter->second; -} - -const char* ResultSchemaProvider::getFieldName(int64_t index) const { - if (index < 0 || index >= static_cast(columns_.size())) { - return nullptr; - } - return columns_[index].name(); -} - -PropertyType ResultSchemaProvider::getFieldType(int64_t index) const { - if (index < 0 || index >= static_cast(columns_.size())) { - return PropertyType::UNKNOWN; - } - - return columns_[index].type(); -} - -PropertyType ResultSchemaProvider::getFieldType(const std::string& name) const { - auto index = getFieldIndex(name); - if (index < 0) { - return PropertyType::UNKNOWN; - } - return columns_[index].type(); -} - -const meta::SchemaProviderIf::Field* ResultSchemaProvider::field(int64_t index) const { - if (index < 0 || index >= static_cast(columns_.size())) { - return nullptr; - } - return &(columns_[index]); -} - -const meta::SchemaProviderIf::Field* ResultSchemaProvider::field(const std::string& name) const { - auto index = getFieldIndex(name); - if (index < 0) { - return nullptr; - } - return &(columns_[index]); -} - -} // namespace nebula diff --git a/src/codec/test/ResultSchemaProvider.h b/src/codec/test/ResultSchemaProvider.h deleted file mode 100644 index c780cf34eb5..00000000000 --- a/src/codec/test/ResultSchemaProvider.h +++ /dev/null @@ -1,83 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef CODEC_TEST_RESULTSCHEMAPROVIDER_H_ -#define CODEC_TEST_RESULTSCHEMAPROVIDER_H_ - -#include "common/base/Base.h" -#include "common/meta/SchemaProviderIf.h" - -namespace nebula { - -class ResultSchemaProvider : public meta::SchemaProviderIf { - public: - class ResultSchemaField : public meta::SchemaProviderIf::Field { - public: - ResultSchemaField(std::string name, - nebula::cpp2::PropertyType type, - int16_t size, - bool nullable, - int32_t offset, - size_t nullFlagPos, - std::string defaultValue = "", - meta::cpp2::GeoShape = meta::cpp2::GeoShape::ANY); - - const char* name() const override; - nebula::cpp2::PropertyType type() const override; - bool nullable() const override; - bool hasDefault() const override; - const std::string& defaultValue() const override; - size_t size() const override; - size_t offset() const override; - size_t nullFlagPos() const override; - meta::cpp2::GeoShape geoShape() const override; - - private: - std::string name_; - nebula::cpp2::PropertyType type_; - int16_t size_; - bool nullable_; - int32_t offset_; - size_t nullFlagPos_; - std::string defaultValue_; - meta::cpp2::GeoShape geoShape_; - }; - - public: - virtual ~ResultSchemaProvider() = default; - - SchemaVer getVersion() const noexcept override { - return schemaVer_; - } - - size_t getNumFields() const noexcept override; - - size_t getNumNullableFields() const noexcept override; - - size_t size() const noexcept override; - - int64_t getFieldIndex(const std::string& name) const override; - const char* getFieldName(int64_t index) const override; - - nebula::cpp2::PropertyType getFieldType(int64_t index) const override; - nebula::cpp2::PropertyType getFieldType(const std::string& name) const override; - - const meta::SchemaProviderIf::Field* field(int64_t index) const override; - const meta::SchemaProviderIf::Field* field(const std::string& name) const override; - - protected: - SchemaVer schemaVer_{0}; - - std::vector columns_; - // Map of Hash64(field_name) -> array index - std::unordered_map nameIndex_; - size_t numNullableFields_{0}; - - // Default constructor, only used by SchemaWriter - explicit ResultSchemaProvider(SchemaVer ver = 0) : schemaVer_(ver) {} -}; - -} // namespace nebula -#endif // CODEC_TEST_RESULTSCHEMAPROVIDER_H_ diff --git a/src/codec/test/RowReaderV2Test.cpp b/src/codec/test/RowReaderV2Test.cpp index 328abbf5ee6..13aeea91c83 100644 --- a/src/codec/test/RowReaderV2Test.cpp +++ b/src/codec/test/RowReaderV2Test.cpp @@ -6,7 +6,6 @@ #include #include "codec/RowReaderWrapper.h" -#include "codec/test/SchemaWriter.h" #include "common/base/Base.h" #include "common/datatypes/Value.h" @@ -17,7 +16,7 @@ using nebula::cpp2::PropertyType; TEST(RowReaderV2, headerInfo) { // Simplest row, nothing in it char data1[] = {0x08}; - SchemaWriter schema1; + meta::NebulaSchemaProvider schema1; auto reader = RowReaderWrapper::getRowReader(&schema1, folly::StringPiece(data1, sizeof(data1))); ASSERT_TRUE(!!reader); EXPECT_EQ(0, reader->schemaVer()); @@ -25,15 +24,15 @@ TEST(RowReaderV2, headerInfo) { // With schema version char data2[] = {0x0A, 0x01, static_cast(0xFF)}; - SchemaWriter schema2(0x00FF01); + meta::NebulaSchemaProvider schema2(0x00FF01); ASSERT_TRUE(reader->reset(&schema2, folly::StringPiece(data2, sizeof(data2)))); EXPECT_EQ(0x0000FF01, reader->schemaVer()); EXPECT_EQ(sizeof(data2), reader->headerLen()); // Insert 33 fields into schema, so we will get 2 offsets - SchemaWriter schema3(0x00FFFF01); + meta::NebulaSchemaProvider schema3(0x00FFFF01); for (int i = 0; i < 33; i++) { - schema3.appendCol(folly::stringPrintf("Column%02d", i), PropertyType::INT64); + schema3.addField(folly::stringPrintf("Column%02d", i), PropertyType::INT64); } // With schema version and offsets @@ -43,9 +42,9 @@ TEST(RowReaderV2, headerInfo) { EXPECT_EQ(sizeof(data3), reader->headerLen()); // No schema version, with offsets - SchemaWriter schema4; + meta::NebulaSchemaProvider schema4; for (int i = 0; i < 33; i++) { - schema4.appendCol(folly::stringPrintf("Column%02d", i), PropertyType::INT64); + schema4.addField(folly::stringPrintf("Column%02d", i), PropertyType::INT64); } char data4[] = {0x08}; @@ -54,40 +53,40 @@ TEST(RowReaderV2, headerInfo) { EXPECT_EQ(sizeof(data4), reader->headerLen()); // Empty row, return illegal schema version - SchemaWriter schema5; + meta::NebulaSchemaProvider schema5; auto reader2 = RowReaderWrapper::getRowReader(&schema5, folly::StringPiece("")); ASSERT_FALSE(!!reader2); ASSERT_FALSE(reader2->reset(&schema5, folly::StringPiece(""))); } TEST(RowReaderV2, encodedData) { - SchemaWriter schema; + meta::NebulaSchemaProvider schema; // Col 0: bool_col1 -- BOOL - schema.appendCol("bool_col1", PropertyType::BOOL); + schema.addField("bool_col1", PropertyType::BOOL); // Col 1: str_col1 -- FIXED_STRING - schema.appendCol("fixed_str_col", PropertyType::FIXED_STRING, 12); + schema.addField("fixed_str_col", PropertyType::FIXED_STRING, 12); // Col 2: int_col1 -- INT32 - schema.appendCol("int32_col", PropertyType::INT32); + schema.addField("int32_col", PropertyType::INT32); // Col 3: int_col2 -- INT64 - schema.appendCol("int64_col", PropertyType::INT64); + schema.addField("int64_col", PropertyType::INT64); // Col 4: vid_col -- VID - schema.appendCol("vid_col", PropertyType::VID); + schema.addField("vid_col", PropertyType::VID); // Col 5: str_col2 -- STRING - schema.appendCol("str_col", PropertyType::STRING); + schema.addField("str_col", PropertyType::STRING); // Col 6: bool_col2 -- BOOL - schema.appendCol("bool_col2", PropertyType::BOOL); + schema.addField("bool_col2", PropertyType::BOOL); // Col 7: float_col -- FLOAT - schema.appendCol("float_col", PropertyType::FLOAT); + schema.addField("float_col", PropertyType::FLOAT); // Col 8: double_col -- DOUBLE - schema.appendCol("double_col", PropertyType::DOUBLE); + schema.addField("double_col", PropertyType::DOUBLE); // Col 9: timestamp_col -- TIMESTAMP - schema.appendCol("timestamp_col", PropertyType::TIMESTAMP); + schema.addField("timestamp_col", PropertyType::TIMESTAMP); // Col 10: date_col -- DATE - schema.appendCol("date_col", PropertyType::DATE); + schema.addField("date_col", PropertyType::DATE); // Col 11: datetime_col -- DATETIME - schema.appendCol("datetime_col", PropertyType::DATETIME); + schema.addField("datetime_col", PropertyType::DATETIME); // Col 12: time_col -- TIME - schema.appendCol("time_col", PropertyType::TIME); + schema.addField("time_col", PropertyType::TIME); std::string encoded; // Single byte header (Schema version is 0, no offset) diff --git a/src/codec/test/RowWriterBenchmark.cpp b/src/codec/test/RowWriterBenchmark.cpp index 9e687ff7e66..0832df2efeb 100644 --- a/src/codec/test/RowWriterBenchmark.cpp +++ b/src/codec/test/RowWriterBenchmark.cpp @@ -7,34 +7,33 @@ #include "codec/RowWriterV2.h" #include "codec/test/RowWriterV1.h" -#include "codec/test/SchemaWriter.h" #include "common/base/Base.h" using nebula::RowWriterV1; using nebula::RowWriterV2; -using nebula::SchemaWriter; using nebula::cpp2::PropertyType; +using nebula::meta::NebulaSchemaProvider; -SchemaWriter schemaShort; -SchemaWriter schemaLong; +NebulaSchemaProvider schemaShort; +NebulaSchemaProvider schemaLong; const double e = 2.71828182845904523536028747135266249775724709369995; const float pi = 3.14159265358979; const std::string str = "Hello world!"; // NOLINT -void prepareSchema(SchemaWriter* schema, size_t numRepeats) { +void prepareSchema(NebulaSchemaProvider* schema, size_t numRepeats) { int32_t index = 1; for (size_t i = 0; i < numRepeats; i++) { - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::BOOL); - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::INT64); - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::TIMESTAMP); - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::FLOAT); - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::DOUBLE); - schema->appendCol(folly::stringPrintf("col%02d", index++), PropertyType::STRING); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::BOOL); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::INT64); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::TIMESTAMP); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::FLOAT); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::DOUBLE); + schema->addField(folly::stringPrintf("col%02d", index++), PropertyType::STRING); } } -void writeDataV1(SchemaWriter* schema, int32_t iters) { +void writeDataV1(NebulaSchemaProvider* schema, int32_t iters) { for (int32_t i = 0; i < iters; i++) { RowWriterV1 writer(schema); for (size_t j = 0; j < schema->getNumFields() / 6; j++) { @@ -45,7 +44,7 @@ void writeDataV1(SchemaWriter* schema, int32_t iters) { } } -void writeDataV2(SchemaWriter* schema, int32_t iters) { +void writeDataV2(NebulaSchemaProvider* schema, int32_t iters) { for (int32_t i = 0; i < iters; i++) { RowWriterV2 writer(schema); size_t idx = 0; diff --git a/src/codec/test/RowWriterV1.cpp b/src/codec/test/RowWriterV1.cpp index 9906ef7d2f5..324728185c2 100644 --- a/src/codec/test/RowWriterV1.cpp +++ b/src/codec/test/RowWriterV1.cpp @@ -7,11 +7,11 @@ namespace nebula { -using meta::SchemaProviderIf; +using meta::NebulaSchemaProvider; using meta::cpp2::Schema; using nebula::cpp2::PropertyType; -RowWriterV1::RowWriterV1(const SchemaProviderIf* schema) : schema_(schema) { +RowWriterV1::RowWriterV1(const NebulaSchemaProvider* schema) : schema_(schema) { CHECK(!!schema_); } diff --git a/src/codec/test/RowWriterV1.h b/src/codec/test/RowWriterV1.h index 62cc782e6cb..c911feac161 100644 --- a/src/codec/test/RowWriterV1.h +++ b/src/codec/test/RowWriterV1.h @@ -6,9 +6,9 @@ #ifndef CODEC_TEST_ROWWRITERV1_H_ #define CODEC_TEST_ROWWRITERV1_H_ -#include "codec/test/SchemaWriter.h" #include "common/base/Base.h" #include "common/base/ICord.h" +#include "common/meta/NebulaSchemaProvider.h" namespace nebula { @@ -36,7 +36,7 @@ class RowWriterV1 { }; public: - explicit RowWriterV1(const meta::SchemaProviderIf* schema); + explicit RowWriterV1(const meta::NebulaSchemaProvider* schema); // Encode into a binary array std::string encode() noexcept; @@ -48,7 +48,7 @@ class RowWriterV1 { // Calculate the exact length of the encoded binary array int64_t size() const noexcept; - const meta::SchemaProviderIf* schema() const { + const meta::NebulaSchemaProvider* schema() const { return schema_; } @@ -68,7 +68,7 @@ class RowWriterV1 { RowWriterV1& operator<<(Skip&& skip) noexcept; private: - const meta::SchemaProviderIf* schema_; + const meta::NebulaSchemaProvider* schema_; ICord<> cord_; int64_t colNum_ = 0; diff --git a/src/codec/test/RowWriterV2Test.cpp b/src/codec/test/RowWriterV2Test.cpp index 5d15ae4e514..03e4eec79c6 100644 --- a/src/codec/test/RowWriterV2Test.cpp +++ b/src/codec/test/RowWriterV2Test.cpp @@ -7,7 +7,6 @@ #include "codec/RowReaderWrapper.h" #include "codec/RowWriterV2.h" -#include "codec/test/SchemaWriter.h" #include "common/base/Base.h" #include "common/expression/ConstantExpression.h" #include "common/time/WallClock.h" @@ -46,30 +45,27 @@ const Geography geogPolygon = Polygon( const Duration du = Duration(1, 2, 3); TEST(RowWriterV2, NoDefaultValue) { - SchemaWriter schema(12 /*Schema version*/); - schema.appendCol("Col01", PropertyType::BOOL); - schema.appendCol("Col02", PropertyType::INT8); - schema.appendCol("Col03", PropertyType::INT16); - schema.appendCol("Col04", PropertyType::INT32); - schema.appendCol("Col05", PropertyType::INT64); - schema.appendCol("Col06", PropertyType::FLOAT); - schema.appendCol("Col07", PropertyType::DOUBLE); - schema.appendCol("Col08", PropertyType::STRING); - schema.appendCol("Col09", PropertyType::FIXED_STRING, 12); - schema.appendCol("Col10", PropertyType::TIMESTAMP); - schema.appendCol("Col11", PropertyType::DATE); - schema.appendCol("Col12", PropertyType::TIME); - schema.appendCol("Col13", PropertyType::DATETIME); - schema.appendCol("Col14", PropertyType::INT64, 0, true); - schema.appendCol("Col15", PropertyType::INT32, 0, true); - schema.appendCol( - "Col16", PropertyType::GEOGRAPHY, 0, false, nullptr, meta::cpp2::GeoShape::POINT); - schema.appendCol( - "Col17", PropertyType::GEOGRAPHY, 0, false, nullptr, meta::cpp2::GeoShape::LINESTRING); - schema.appendCol( - "Col18", PropertyType::GEOGRAPHY, 0, false, nullptr, meta::cpp2::GeoShape::POLYGON); - schema.appendCol("Col19", PropertyType::GEOGRAPHY, 0, true, nullptr, meta::cpp2::GeoShape::ANY); - schema.appendCol("Col20", PropertyType::DURATION); + meta::NebulaSchemaProvider schema(12 /*Schema version*/); + schema.addField("Col01", PropertyType::BOOL); + schema.addField("Col02", PropertyType::INT8); + schema.addField("Col03", PropertyType::INT16); + schema.addField("Col04", PropertyType::INT32); + schema.addField("Col05", PropertyType::INT64); + schema.addField("Col06", PropertyType::FLOAT); + schema.addField("Col07", PropertyType::DOUBLE); + schema.addField("Col08", PropertyType::STRING); + schema.addField("Col09", PropertyType::FIXED_STRING, 12); + schema.addField("Col10", PropertyType::TIMESTAMP); + schema.addField("Col11", PropertyType::DATE); + schema.addField("Col12", PropertyType::TIME); + schema.addField("Col13", PropertyType::DATETIME); + schema.addField("Col14", PropertyType::INT64, 0, true); + schema.addField("Col15", PropertyType::INT32, 0, true); + schema.addField("Col16", PropertyType::GEOGRAPHY, 0, false, "", meta::cpp2::GeoShape::POINT); + schema.addField("Col17", PropertyType::GEOGRAPHY, 0, false, "", meta::cpp2::GeoShape::LINESTRING); + schema.addField("Col18", PropertyType::GEOGRAPHY, 0, false, "", meta::cpp2::GeoShape::POLYGON); + schema.addField("Col19", PropertyType::GEOGRAPHY, 0, true, "", meta::cpp2::GeoShape::ANY); + schema.addField("Col20", PropertyType::DURATION); ASSERT_EQ(Value::Type::STRING, sVal.type()); ASSERT_EQ(Value::Type::INT, iVal.type()); @@ -270,12 +266,17 @@ TEST(RowWriterV2, WithDefaultValue) { ObjectPool objPool; auto pool = &objPool; - SchemaWriter schema(7 /*Schema version*/); - schema.appendCol("Col01", PropertyType::BOOL, 0, true); - schema.appendCol("Col02", PropertyType::INT64, 0, false, ConstantExpression::make(pool, 12345)); - schema.appendCol("Col03", PropertyType::STRING, 0, true, ConstantExpression::make(pool, str)); - schema.appendCol( - "Col04", PropertyType::FIXED_STRING, 12, false, ConstantExpression::make(pool, fixed)); + meta::NebulaSchemaProvider schema(7 /*Schema version*/); + schema.addField("Col01", PropertyType::BOOL, 0, true); + schema.addField( + "Col02", PropertyType::INT64, 0, false, ConstantExpression::make(pool, 12345)->encode()); + schema.addField( + "Col03", PropertyType::STRING, 0, true, ConstantExpression::make(pool, str)->encode()); + schema.addField("Col04", + PropertyType::FIXED_STRING, + 12, + false, + ConstantExpression::make(pool, fixed)->encode()); RowWriterV2 writer(&schema); ASSERT_EQ(WriteResult::SUCCEEDED, writer.finish()); @@ -312,12 +313,12 @@ TEST(RowWriterV2, WithDefaultValue) { } TEST(RowWriterV2, DoubleSet) { - SchemaWriter schema(3 /*Schema version*/); - schema.appendCol("Col01", PropertyType::BOOL, 0, true); - schema.appendCol("Col02", PropertyType::INT64); - schema.appendCol("Col03", PropertyType::STRING); - schema.appendCol("Col04", PropertyType::STRING, 0, true); - schema.appendCol("Col05", PropertyType::FIXED_STRING, 12); + meta::NebulaSchemaProvider schema(3 /*Schema version*/); + schema.addField("Col01", PropertyType::BOOL, 0, true); + schema.addField("Col02", PropertyType::INT64); + schema.addField("Col03", PropertyType::STRING); + schema.addField("Col04", PropertyType::STRING, 0, true); + schema.addField("Col05", PropertyType::FIXED_STRING, 12); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", false)); @@ -372,12 +373,12 @@ TEST(RowWriterV2, DoubleSet) { } TEST(RowWriterV2, Update) { - SchemaWriter schema(2 /*Schema version*/); - schema.appendCol("Col01", PropertyType::BOOL, 0, true); - schema.appendCol("Col02", PropertyType::INT64); - schema.appendCol("Col03", PropertyType::STRING); - schema.appendCol("Col04", PropertyType::STRING, 0, true); - schema.appendCol("Col05", PropertyType::FIXED_STRING, 12); + meta::NebulaSchemaProvider schema(2 /*Schema version*/); + schema.addField("Col01", PropertyType::BOOL, 0, true); + schema.addField("Col02", PropertyType::INT64); + schema.addField("Col03", PropertyType::STRING); + schema.addField("Col04", PropertyType::STRING, 0, true); + schema.addField("Col05", PropertyType::FIXED_STRING, 12); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", true)); @@ -449,8 +450,8 @@ TEST(RowWriterV2, Update) { } TEST(RowWriterV2, Timestamp) { - SchemaWriter schema(20 /*Schema version*/); - schema.appendCol("Col01", PropertyType::TIMESTAMP); + meta::NebulaSchemaProvider schema(20 /*Schema version*/); + schema.addField("Col01", PropertyType::TIMESTAMP); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", 1582183355)); @@ -477,8 +478,8 @@ TEST(RowWriterV2, Timestamp) { } TEST(RowWriterV2, EmptyString) { - SchemaWriter schema(0 /*Schema version*/); - schema.appendCol("Col01", PropertyType::STRING); + meta::NebulaSchemaProvider schema(0 /*Schema version*/); + schema.addField("Col01", PropertyType::STRING); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", "")); @@ -495,13 +496,13 @@ TEST(RowWriterV2, EmptyString) { } TEST(RowWriterV2, NumericLimit) { - SchemaWriter schema(1 /*Schema version*/); - schema.appendCol("Col01", PropertyType::INT8); - schema.appendCol("Col03", PropertyType::INT16); - schema.appendCol("Col06", PropertyType::INT32); - schema.appendCol("Col07", PropertyType::INT64); - schema.appendCol("Col09", PropertyType::FLOAT); - schema.appendCol("Col11", PropertyType::DOUBLE); + meta::NebulaSchemaProvider schema(1 /*Schema version*/); + schema.addField("Col01", PropertyType::INT8); + schema.addField("Col03", PropertyType::INT16); + schema.addField("Col06", PropertyType::INT32); + schema.addField("Col07", PropertyType::INT64); + schema.addField("Col09", PropertyType::FLOAT); + schema.addField("Col11", PropertyType::DOUBLE); { RowWriterV2 writer(&schema); @@ -697,8 +698,8 @@ TEST(RowWriterV2, NumericLimit) { TEST(RowWriterV2, TimestampTest) { { - SchemaWriter schema(1); - schema.appendCol("Col01", PropertyType::STRING); + meta::NebulaSchemaProvider schema(1); + schema.addField("Col01", PropertyType::STRING); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", "")); @@ -716,8 +717,8 @@ TEST(RowWriterV2, TimestampTest) { EXPECT_TRUE(ret); } { - SchemaWriter schema(1); - schema.appendCol("Col01", PropertyType::STRING); + meta::NebulaSchemaProvider schema(1); + schema.addField("Col01", PropertyType::STRING); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", "")); @@ -737,8 +738,8 @@ TEST(RowWriterV2, TimestampTest) { EXPECT_TRUE(ret); } { - SchemaWriter schema(1); - schema.appendCol("Col01", PropertyType::STRING); + meta::NebulaSchemaProvider schema(1); + schema.addField("Col01", PropertyType::STRING); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", "")); @@ -755,8 +756,8 @@ TEST(RowWriterV2, TimestampTest) { encoded2.substr(0, encoded2.size() - sizeof(int64_t))); } { - SchemaWriter schema(1); - schema.appendCol("Col01", PropertyType::INT64); + meta::NebulaSchemaProvider schema(1); + schema.addField("Col01", PropertyType::INT64); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", 1)); @@ -774,9 +775,9 @@ TEST(RowWriterV2, TimestampTest) { } { // test checkUnsetFields - SchemaWriter schema(1); - schema.appendCol("Col01", PropertyType::INT64); - schema.appendCol("Col02", PropertyType::INT64); + meta::NebulaSchemaProvider schema(1); + schema.addField("Col01", PropertyType::INT64); + schema.addField("Col02", PropertyType::INT64); RowWriterV2 writer(&schema); EXPECT_EQ(WriteResult::SUCCEEDED, writer.set("Col01", 1)); diff --git a/src/codec/test/SchemaWriter.cpp b/src/codec/test/SchemaWriter.cpp deleted file mode 100644 index d5c4ddf76bf..00000000000 --- a/src/codec/test/SchemaWriter.cpp +++ /dev/null @@ -1,106 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include "codec/test/SchemaWriter.h" - -#include "common/base/Base.h" - -namespace nebula { - -using meta::cpp2::Schema; -using nebula::cpp2::PropertyType; - -SchemaWriter& SchemaWriter::appendCol(folly::StringPiece name, - PropertyType type, - int32_t fixedStrLen, - bool nullable, - Expression* defaultValue, - meta::cpp2::GeoShape geoShape) noexcept { - using folly::hash::SpookyHashV2; - uint64_t hash = SpookyHashV2::Hash64(name.data(), name.size(), 0); - DCHECK(nameIndex_.find(hash) == nameIndex_.end()); - - int32_t offset = 0; - if (columns_.size() > 0) { - auto& prevCol = columns_.back(); - offset = prevCol.offset() + prevCol.size(); - } else { - offset = 0; - } - - int16_t size = 0; - switch (type) { - case PropertyType::BOOL: - size = sizeof(bool); - break; - case PropertyType::INT8: - size = sizeof(int8_t); - break; - case PropertyType::INT16: - size = sizeof(int16_t); - break; - case PropertyType::INT32: - size = sizeof(int32_t); - break; - case PropertyType::INT64: - size = sizeof(int64_t); - break; - case PropertyType::VID: - size = sizeof(int64_t); - break; - case PropertyType::FLOAT: - size = sizeof(float); - break; - case PropertyType::DOUBLE: - size = sizeof(double); - break; - case PropertyType::STRING: - size = 2 * sizeof(int32_t); - break; - case PropertyType::FIXED_STRING: - CHECK_GT(fixedStrLen, 0) << "Fixed string length has to be greater than 0"; - size = fixedStrLen; - break; - case PropertyType::TIMESTAMP: - size = sizeof(Timestamp); - break; - case PropertyType::DATE: - size = sizeof(int16_t) + 2 * sizeof(int8_t); - break; - case PropertyType::TIME: - size = 3 * sizeof(int8_t) + sizeof(int32_t); - break; - case PropertyType::DATETIME: - size = sizeof(int16_t) + 5 * sizeof(int8_t) + sizeof(int32_t); - break; - case PropertyType::GEOGRAPHY: - size = 2 * sizeof(int32_t); // as same as STRING - break; - case PropertyType::DURATION: - size = sizeof(int64_t) + sizeof(int32_t) + sizeof(int32_t); - break; - default: - LOG(FATAL) << "Unknown column type"; - } - - size_t nullFlagPos = 0; - if (nullable) { - nullFlagPos = numNullableFields_++; - } - - columns_.emplace_back(name.toString(), - type, - size, - nullable, - offset, - nullFlagPos, - defaultValue ? defaultValue->encode() : "", - geoShape); - nameIndex_.emplace(std::make_pair(hash, columns_.size() - 1)); - - return *this; -} - -} // namespace nebula diff --git a/src/codec/test/SchemaWriter.h b/src/codec/test/SchemaWriter.h deleted file mode 100644 index 8b0e8228e43..00000000000 --- a/src/codec/test/SchemaWriter.h +++ /dev/null @@ -1,30 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#ifndef CODEC_TEST_SCHEMAWRITER_H_ -#define CODEC_TEST_SCHEMAWRITER_H_ - -#include "codec/test/ResultSchemaProvider.h" -#include "common/base/Base.h" -#include "common/expression/Expression.h" - -namespace nebula { - -class SchemaWriter : public ResultSchemaProvider { - public: - explicit SchemaWriter(SchemaVer ver = 0) : ResultSchemaProvider(ver) {} - - SchemaWriter& appendCol(folly::StringPiece name, - nebula::cpp2::PropertyType type, - int32_t fixedStrLen = 0, - bool nullable = false, - Expression* defaultValue = nullptr, - meta::cpp2::GeoShape geoShape = meta::cpp2::GeoShape::ANY) noexcept; - - private: -}; - -} // namespace nebula -#endif // CODEC_TEST_SCHEMAWRITER_H_ diff --git a/src/common/meta/NebulaSchemaProvider.cpp b/src/common/meta/NebulaSchemaProvider.cpp index 51bc08c7de5..570d931bc6e 100644 --- a/src/common/meta/NebulaSchemaProvider.cpp +++ b/src/common/meta/NebulaSchemaProvider.cpp @@ -71,7 +71,7 @@ PropertyType NebulaSchemaProvider::getFieldType(const std::string& name) const { return fields_[it->second].type(); } -const SchemaProviderIf::Field* NebulaSchemaProvider::field(int64_t index) const { +const NebulaSchemaProvider::SchemaField* NebulaSchemaProvider::field(int64_t index) const { if (index < 0) { VLOG(2) << "Invalid index " << index; return nullptr; @@ -84,7 +84,8 @@ const SchemaProviderIf::Field* NebulaSchemaProvider::field(int64_t index) const return &fields_[index]; } -const SchemaProviderIf::Field* NebulaSchemaProvider::field(const std::string& name) const { +const NebulaSchemaProvider::SchemaField* NebulaSchemaProvider::field( + const std::string& name) const { auto it = fieldNameIndex_.find(name); if (it == fieldNameIndex_.end()) { VLOG(2) << "Unknown field \"" << name << "\""; @@ -94,7 +95,7 @@ const SchemaProviderIf::Field* NebulaSchemaProvider::field(const std::string& na return &fields_[it->second]; } -void NebulaSchemaProvider::addField(folly::StringPiece name, +void NebulaSchemaProvider::addField(const std::string& name, PropertyType type, size_t fixedStrLen, bool nullable, @@ -113,16 +114,9 @@ void NebulaSchemaProvider::addField(folly::StringPiece name, nullFlagPos = numNullableFields_++; } - fields_.emplace_back(name.toString(), - type, - nullable, - defaultValue != "", - defaultValue, - size, - offset, - nullFlagPos, - geoShape); - fieldNameIndex_.emplace(name.toString(), static_cast(fields_.size() - 1)); + fields_.emplace_back( + name, type, nullable, defaultValue != "", defaultValue, size, offset, nullFlagPos, geoShape); + fieldNameIndex_.emplace(name, static_cast(fields_.size() - 1)); } /*static*/ diff --git a/src/common/meta/NebulaSchemaProvider.h b/src/common/meta/NebulaSchemaProvider.h index 34fc24075e3..48a688a1616 100644 --- a/src/common/meta/NebulaSchemaProvider.h +++ b/src/common/meta/NebulaSchemaProvider.h @@ -7,21 +7,23 @@ #define COMMON_META_NEBULASCHEMAPROVIDER_H_ #include +#include #include "common/base/Base.h" #include "common/base/StatusOr.h" -#include "common/meta/SchemaProviderIf.h" +#include "common/expression/Expression.h" +#include "interface/gen-cpp2/meta_types.h" namespace nebula { namespace meta { -class NebulaSchemaProvider : public SchemaProviderIf { - friend class FileBasedSchemaManager; +class NebulaSchemaProvider { + friend class RowReaderV2Test; public: - class SchemaField final : public SchemaProviderIf::Field { + class SchemaField final { public: - SchemaField(std::string name, + SchemaField(const std::string& name, nebula::cpp2::PropertyType type, bool nullable, bool hasDefault, @@ -30,8 +32,8 @@ class NebulaSchemaProvider : public SchemaProviderIf { size_t offset, size_t nullFlagPos, cpp2::GeoShape geoShape) - : name_(std::move(name)), - type_(std::move(type)), + : name_(name), + type_(type), nullable_(nullable), hasDefault_(hasDefault), defaultValue_(defaultValue), @@ -40,40 +42,40 @@ class NebulaSchemaProvider : public SchemaProviderIf { nullFlagPos_(nullFlagPos), geoShape_(geoShape) {} - const char* name() const override { + const char* name() const { return name_.c_str(); } - nebula::cpp2::PropertyType type() const override { + nebula::cpp2::PropertyType type() const { return type_; } - bool nullable() const override { + bool nullable() const { return nullable_; } - bool hasDefault() const override { + bool hasDefault() const { return hasDefault_; } - const std::string& defaultValue() const override { + const std::string& defaultValue() const { return defaultValue_; } - size_t size() const override { + size_t size() const { return size_; } - size_t offset() const override { + size_t offset() const { return offset_; } - size_t nullFlagPos() const override { + size_t nullFlagPos() const { DCHECK(nullable_); return nullFlagPos_; } - cpp2::GeoShape geoShape() const override { + cpp2::GeoShape geoShape() const { return geoShape_; } @@ -89,34 +91,91 @@ class NebulaSchemaProvider : public SchemaProviderIf { cpp2::GeoShape geoShape_; }; + class Iterator final { + friend class NebulaSchemaProvider; + + public: + const SchemaField& operator*() const { + return *field_; + } + + const SchemaField* operator->() const { + return field_; + } + + Iterator& operator++() { + if (field_) { + index_++; + field_ = schema_->field(index_); + } + return *this; + } + + Iterator& operator+(uint16_t steps) { + if (field_) { + index_ += steps; + field_ = schema_->field(index_); + } + return *this; + } + + operator bool() const { + return static_cast(field_); + } + + bool operator==(const Iterator& rhs) const { + return schema_ == rhs.schema_ && (index_ == rhs.index_ || (!field_ && !rhs.field_)); + } + + private: + const NebulaSchemaProvider* schema_; + size_t numFields_; + int64_t index_; + const SchemaField* field_; + + private: + explicit Iterator(const NebulaSchemaProvider* schema, int64_t idx = 0) + : schema_(schema), numFields_(schema_->getNumFields()), index_(idx) { + field_ = schema_->field(index_); + } + }; + public: explicit NebulaSchemaProvider(SchemaVer ver) : ver_(ver), numNullableFields_(0) {} - SchemaVer getVersion() const noexcept override; + NebulaSchemaProvider() : ver_(0), numNullableFields_(0) {} + + SchemaVer getVersion() const noexcept; // Returns the size of fields_ - size_t getNumFields() const noexcept override; - size_t getNumNullableFields() const noexcept override; + size_t getNumFields() const noexcept; + size_t getNumNullableFields() const noexcept; // Returns the total space in bytes occupied by the fields_ - size_t size() const noexcept override; + size_t size() const noexcept; - int64_t getFieldIndex(const std::string& name) const override; - const char* getFieldName(int64_t index) const override; + int64_t getFieldIndex(const std::string& name) const; + const char* getFieldName(int64_t index) const; - nebula::cpp2::PropertyType getFieldType(int64_t index) const override; - nebula::cpp2::PropertyType getFieldType(const std::string& name) const override; + nebula::cpp2::PropertyType getFieldType(int64_t index) const; + nebula::cpp2::PropertyType getFieldType(const std::string& name) const; - const SchemaProviderIf::Field* field(int64_t index) const override; - const SchemaProviderIf::Field* field(const std::string& name) const override; + const SchemaField* field(int64_t index) const; + const SchemaField* field(const std::string& name) const; - void addField(folly::StringPiece name, + void addField(const std::string& name, nebula::cpp2::PropertyType type, size_t fixedStrLen = 0, bool nullable = false, std::string defaultValue = "", cpp2::GeoShape geoShape = cpp2::GeoShape::ANY); - static std::size_t fieldSize(nebula::cpp2::PropertyType type, std::size_t fixedStrLimit); + Iterator begin() const { + return Iterator(this, 0); + } + + Iterator end() const { + return Iterator(this, getNumFields()); + } void setProp(cpp2::SchemaProp schemaProp); @@ -128,10 +187,10 @@ class NebulaSchemaProvider : public SchemaProviderIf { return numNullableFields_ != 0; } - protected: - NebulaSchemaProvider() = default; + private: + std::size_t fieldSize(nebula::cpp2::PropertyType type, std::size_t fixedStrLimit); - protected: + private: SchemaVer ver_{-1}; // fieldname -> index diff --git a/src/common/meta/SchemaProviderIf.cpp b/src/common/meta/SchemaProviderIf.cpp index 05b75eeef33..59c42052d63 100644 --- a/src/common/meta/SchemaProviderIf.cpp +++ b/src/common/meta/SchemaProviderIf.cpp @@ -3,9 +3,8 @@ * This source code is licensed under Apache 2.0 License. */ -#include "common/meta/SchemaProviderIf.h" - #include "common/base/Base.h" +#include "common/meta/NebulaSchemaProvider.h" namespace nebula { namespace meta {} // namespace meta diff --git a/src/common/utils/IndexKeyUtils.cpp b/src/common/utils/IndexKeyUtils.cpp index effeb07232f..0889e0b183a 100644 --- a/src/common/utils/IndexKeyUtils.cpp +++ b/src/common/utils/IndexKeyUtils.cpp @@ -174,7 +174,7 @@ Value IndexKeyUtils::parseIndexTTL(const folly::StringPiece& raw) { StatusOr> IndexKeyUtils::collectIndexValues( RowReaderWrapper* reader, const meta::cpp2::IndexItem* indexItem, - const meta::SchemaProviderIf* latestSchema) { + const meta::NebulaSchemaProvider* latestSchema) { if (reader == nullptr) { return Status::Error("Invalid row reader"); } @@ -200,9 +200,10 @@ StatusOr> IndexKeyUtils::collectIndexValues( } // static -StatusOr IndexKeyUtils::readValueWithLatestSche(RowReaderWrapper* reader, - const std::string propName, - const meta::SchemaProviderIf* latestSchema) { +StatusOr IndexKeyUtils::readValueWithLatestSche( + RowReaderWrapper* reader, + const std::string propName, + const meta::NebulaSchemaProvider* latestSchema) { auto value = reader->getValueByName(propName); if (latestSchema == nullptr || !value.isNull() || value.getNull() != NullType::UNKNOWN_PROP) { return value; diff --git a/src/common/utils/IndexKeyUtils.h b/src/common/utils/IndexKeyUtils.h index 036be7b15f2..0af8c904393 100644 --- a/src/common/utils/IndexKeyUtils.h +++ b/src/common/utils/IndexKeyUtils.h @@ -547,14 +547,14 @@ class IndexKeyUtils final { static StatusOr> collectIndexValues( RowReaderWrapper* reader, const meta::cpp2::IndexItem* indexItem, - const meta::SchemaProviderIf* latestSchema = nullptr); + const meta::NebulaSchemaProvider* latestSchema = nullptr); private: IndexKeyUtils() = delete; static StatusOr readValueWithLatestSche(RowReaderWrapper* reader, const std::string propName, - const meta::SchemaProviderIf* latestSchema); + const meta::NebulaSchemaProvider* latestSchema); static Status checkValue(const Value& v, bool isNullable); }; diff --git a/src/common/utils/test/CMakeLists.txt b/src/common/utils/test/CMakeLists.txt index 661dcd3e5af..41729c853d7 100644 --- a/src/common/utils/test/CMakeLists.txt +++ b/src/common/utils/test/CMakeLists.txt @@ -27,6 +27,16 @@ nebula_add_test( $ $ $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ LIBRARIES gtest ${THRIFT_LIBRARIES} @@ -57,6 +67,16 @@ nebula_add_test( $ $ $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ LIBRARIES gtest ${THRIFT_LIBRARIES} @@ -90,6 +110,16 @@ nebula_add_test( $ $ $ + $ + $ + $ + $ + $ + $ + $ + $ + $ + $ LIBRARIES gtest ${THRIFT_LIBRARIES} @@ -114,7 +144,6 @@ nebula_add_test( $ $ $ - # $ LIBRARIES ${THRIFT_LIBRARIES} ${PROXYGEN_LIBRARIES} diff --git a/src/graph/validator/FetchEdgesValidator.h b/src/graph/validator/FetchEdgesValidator.h index 2f652af5196..f5ee9fa1883 100644 --- a/src/graph/validator/FetchEdgesValidator.h +++ b/src/graph/validator/FetchEdgesValidator.h @@ -37,7 +37,7 @@ class FetchEdgesValidator final : public Validator { EdgeType edgeType_{0}; - std::shared_ptr edgeSchema_; + std::shared_ptr edgeSchema_; std::unique_ptr fetchCtx_; }; diff --git a/src/graph/validator/FetchVerticesValidator.h b/src/graph/validator/FetchVerticesValidator.h index bd4d81233be..ba28fcc8fe2 100644 --- a/src/graph/validator/FetchVerticesValidator.h +++ b/src/graph/validator/FetchVerticesValidator.h @@ -30,7 +30,7 @@ class FetchVerticesValidator final : public Validator { } private: - std::map> tagsSchema_; + std::map> tagsSchema_; std::vector tagIds_; std::unique_ptr fetchCtx_; diff --git a/src/graph/validator/MutateValidator.h b/src/graph/validator/MutateValidator.h index 3c3687a930c..5bcd6912567 100644 --- a/src/graph/validator/MutateValidator.h +++ b/src/graph/validator/MutateValidator.h @@ -28,7 +28,7 @@ class InsertVerticesValidator final : public Validator { Status prepareVertices(); private: - using TagSchema = std::shared_ptr; + using TagSchema = std::shared_ptr; GraphSpaceID spaceId_{-1}; std::vector rows_; std::unordered_map> tagPropNames_; @@ -57,7 +57,7 @@ class InsertEdgesValidator final : public Validator { bool ifNotExists_{false}; bool ignoreExistedIndex_{false}; EdgeType edgeType_{-1}; - std::shared_ptr schema_; + std::shared_ptr schema_; std::vector propNames_; std::vector entirePropNames_; std::vector rows_; diff --git a/src/kvstore/plugins/hbase/HBaseStore.cpp b/src/kvstore/plugins/hbase/HBaseStore.cpp index 65f3e1a8254..50e8a703284 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.cpp +++ b/src/kvstore/plugins/hbase/HBaseStore.cpp @@ -40,10 +40,10 @@ std::string HBaseStore::spaceIdToTableName(GraphSpaceID spaceId) { return kHBaseTableNamePrefix + folly::to(spaceId); } -std::shared_ptr HBaseStore::getSchema(GraphSpaceID spaceId, - const std::string& key, - SchemaVer version) { - std::shared_ptr schema; +std::shared_ptr HBaseStore::getSchema(GraphSpaceID spaceId, + const std::string& key, + SchemaVer version) { + std::shared_ptr schema; folly::StringPiece rawKey = key; if (NebulaKeyUtils::isTag(key)) { TagID tagId = NebulaKeyUtils::getTagId(rawKey); diff --git a/src/kvstore/plugins/hbase/HBaseStore.h b/src/kvstore/plugins/hbase/HBaseStore.h index 7b085be7fb0..c264aa4bd1f 100644 --- a/src/kvstore/plugins/hbase/HBaseStore.h +++ b/src/kvstore/plugins/hbase/HBaseStore.h @@ -9,8 +9,8 @@ #include #include "common/base/Base.h" +#include "common/meta/NebulaSchemaProvider.h" #include "common/meta/SchemaManager.h" -#include "common/meta/SchemaProviderIf.h" #include "kvstore/KVIterator.h" #include "kvstore/KVStore.h" #include "kvstore/plugins/hbase/HBaseClient.h" @@ -223,9 +223,9 @@ class HBaseStore : public KVStore { inline std::string spaceIdToTableName(GraphSpaceID spaceId); - std::shared_ptr getSchema(GraphSpaceID spaceId, - const std::string& key, - SchemaVer version = -1); + std::shared_ptr getSchema(GraphSpaceID spaceId, + const std::string& key, + SchemaVer version = -1); std::string encode(GraphSpaceID spaceId, const std::string& key, KVMap& values); diff --git a/src/meta/test/BalanceIntegrationTest.cpp b/src/meta/test/BalanceIntegrationTest.cpp deleted file mode 100644 index 519862ef480..00000000000 --- a/src/meta/test/BalanceIntegrationTest.cpp +++ /dev/null @@ -1,337 +0,0 @@ -/* Copyright (c) 2019 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include -#include - -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "meta/test/TestUtils.h" -#include "storage/client/StorageClient.h" -#include "storage/test/TestUtils.h" - -DECLARE_int32(heartbeat_interval_secs); -DECLARE_uint32(raft_heartbeat_interval_secs); -DECLARE_uint32(expired_time_factor); - -namespace nebula { -namespace meta { - -TEST(BalanceIntegrationTest, BalanceTest) { - FLAGS_heartbeat_interval_secs = 1; - FLAGS_raft_heartbeat_interval_secs = 1; - fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); - IPv4 localIp; - network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - const nebula::ClusterID kClusterId = 10; - - uint32_t localMetaPort = network::NetworkUtils::getAvailablePort(); - LOG(INFO) << "Start meta server...."; - std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); - auto metaServerContext = - meta::TestUtils::mockMetaServer(localMetaPort, metaPath.c_str(), kClusterId); - localMetaPort = metaServerContext->port_; - - auto adminClient = std::make_unique(metaServerContext->kvStore_.get()); - Balancer balancer(metaServerContext->kvStore_.get(), std::move(adminClient)); - - auto threadPool = std::make_shared(10); - std::vector metaAddr = {HostAddr(localIp, localMetaPort)}; - - LOG(INFO) << "Create meta client..."; - auto mClient = std::make_unique(threadPool, metaAddr); - - mClient->waitForMetadReady(); - - int partition = 1; - int replica = 3; - LOG(INFO) << "Start " << replica << " storage services, partition number " << partition; - std::vector peers; - std::vector storagePorts; - std::vector> metaClients; - std::vector> serverContexts; - for (int i = 0; i < replica; i++) { - uint32_t storagePort = network::NetworkUtils::getAvailablePort(); - HostAddr storageAddr(localIp, storagePort); - storagePorts.emplace_back(storagePort); - peers.emplace_back(storageAddr); - - VLOG(1) << "The storage server has been added to the meta service"; - - meta::MetaClientOptions options; - options.localHost_ = storageAddr; - options.clusterId_ = kClusterId; - options.role_ = meta::cpp2::HostRole::STORAGE; - auto metaClient = std::make_shared(threadPool, metaAddr, options); - metaClient->waitForMetadReady(); - metaClients.emplace_back(metaClient); - } - - for (int i = 0; i < replica; i++) { - std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), i); - auto sc = storage::TestUtils::mockStorageServer( - metaClients[i].get(), dataPath.c_str(), localIp, storagePorts[i], true); - serverContexts.emplace_back(std::move(sc)); - } - - LOG(INFO) << "Create space and schema"; - SpaceDesc spaceDesc("storage", partition, replica); - auto ret = mClient->createSpace(spaceDesc).get(); - ASSERT_TRUE(ret.ok()); - auto spaceId = ret.value(); - - std::vector columns; - nebula::cpp2::ValueType vt; - vt.set_type(SupportedType::STRING); - columns.emplace_back(); - columns.back().set_name("c"); - columns.back().set_type(vt); - - cpp2::Schema schema; - schema.set_columns(std::move(columns)); - auto tagRet = mClient->createTagSchema(spaceId, "tag", std::move(schema)).get(); - ASSERT_TRUE(tagRet.ok()); - auto tagId = tagRet.value(); - sleep(FLAGS_heartbeat_interval_secs + FLAGS_raft_heartbeat_interval_secs + 3); - - LOG(INFO) << "Let's write some data"; - auto sClient = std::make_unique(threadPool, mClient.get()); - { - std::vector vertices; - for (int32_t vId = 0; vId < 10000; vId++) { - storage::cpp2::Vertex v; - v.set_id(vId); - decltype(v.tags) tags; - storage::cpp2::Tag t; - t.set_tag_id(tagId); - // Generate some tag props. - RowWriter writer; - writer << std::string(1024, 'A'); - t.set_props(writer.encode()); - tags.emplace_back(std::move(t)); - v.set_tags(std::move(tags)); - vertices.emplace_back(std::move(v)); - } - int retry = 10; - while (retry-- > 0) { - auto f = sClient->addVertices(spaceId, vertices, true); - LOG(INFO) << "Waiting for the response..."; - auto resp = std::move(f).get(); - if (resp.completeness() == 100) { - LOG(INFO) << "All requests has been processed!"; - break; - } - if (!resp.succeeded()) { - for (auto& err : resp.failedParts()) { - LOG(ERROR) << "Partition " << err.first - << " failed: " << apache::thrift::util::enumNameSafe(err.second); - } - } - LOG(INFO) << "Failed, the remaining retry times " << retry; - } - } - { - LOG(INFO) << "Check data..."; - std::vector vIds; - std::vector retCols; - for (int32_t vId = 0; vId < 10000; vId++) { - vIds.emplace_back(vId); - } - retCols.emplace_back(storage::TestUtils::vertexPropDef("c", tagId)); - auto f = sClient->getVertexProps(spaceId, std::move(vIds), std::move(retCols)); - auto resp = std::move(f).get(); - if (!resp.succeeded()) { - std::stringstream ss; - for (auto& p : resp.failedParts()) { - ss << "Part " << p.first << ": " << apache::thrift::util::enumNameSafe(p.second) << "; "; - } - VLOG(2) << "Failed partitions:: " << ss.str(); - } - ASSERT_TRUE(resp.succeeded()); - auto& results = resp.responses(); - EXPECT_EQ(partition, results.size()); - EXPECT_EQ(0, results[0].result.failed_codes.size()); - EXPECT_EQ(1, results[0].vertex_schema[tagId].columns.size()); - auto tagProvider = std::make_shared(results[0].vertex_schema[tagId]); - EXPECT_EQ(10000, results[0].vertices.size()); - } - LOG(INFO) << "Let's open a new storage service"; - std::unique_ptr newServer; - std::unique_ptr newMetaClient; - uint32_t storagePort = network::NetworkUtils::getAvailablePort(); - HostAddr storageAddr(localIp, storagePort); - { - MetaClientOptions options; - options.localHost_ = storageAddr; - options.clusterId_ = kClusterId; - options.role_ = meta::cpp2::HostRole::STORAGE; - newMetaClient = std::make_unique(threadPool, metaAddr, options); - newMetaClient->waitForMetadReady(); - std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), replica + 1); - newServer = storage::TestUtils::mockStorageServer( - newMetaClient.get(), dataPath.c_str(), localIp, storagePort, true); - LOG(INFO) << "Start a new storage server on " << storageAddr; - } - LOG(INFO) << "Let's stop the last storage service " << storagePorts.back(); - { - metaClients.back()->stop(); - serverContexts.back().reset(); - // Wait for the host be expired on meta. - sleep(FLAGS_heartbeat_interval_secs * FLAGS_expired_time_factor + 1); - } - - LOG(INFO) << "Let's balance"; - auto bIdRet = balancer.balance(); - CHECK(ok(bIdRet)); - while (balancer.isRunning()) { - sleep(1); - } - - // Sleep enough time until the data committed on newly added server - sleep(FLAGS_raft_heartbeat_interval_secs + 5); - { - LOG(INFO) << "Balance Finished, check the newly added server"; - std::unique_ptr iter; - auto prefix = NebulaKeyUtils::prefix(1); - auto partRet = newServer->kvStore_->part(spaceId, 1); - CHECK(ok(partRet)); - auto part = value(partRet); - ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, part->engine()->prefix(prefix, &iter)); - int num = 0; - std::string lastKey = ""; - while (iter->valid()) { - // filter the multiple versions for data. - auto key = NebulaKeyUtils::keyWithNoVersion(iter->key()); - if (lastKey == key) { - iter->next(); - continue; - } - lastKey = key.str(); - iter->next(); - num++; - } - ASSERT_EQ(10000, num); - } - { - LOG(INFO) << "Check meta"; - auto paRet = mClient->getPartsAlloc(spaceId).get(); - ASSERT_TRUE(paRet.ok()) << paRet.status(); - ASSERT_EQ(1, paRet.value().size()); - for (auto it = paRet.value().begin(); it != paRet.value().end(); it++) { - ASSERT_EQ(3, it->second.size()); - ASSERT_TRUE(std::find(it->second.begin(), it->second.end(), storageAddr) != it->second.end()); - } - } - for (auto& c : metaClients) { - c->stop(); - } - serverContexts.clear(); - metaClients.clear(); - newMetaClient->stop(); - newServer.reset(); - newMetaClient.reset(); -} - -TEST(BalanceIntegrationTest, LeaderBalanceTest) { - FLAGS_heartbeat_interval_secs = 1; - FLAGS_raft_heartbeat_interval_secs = 1; - fs::TempDir rootPath("/tmp/balance_integration_test.XXXXXX"); - IPv4 localIp; - network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - const nebula::ClusterID kClusterId = 10; - - uint32_t localMetaPort = network::NetworkUtils::getAvailablePort(); - LOG(INFO) << "Start meta server...."; - std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); - auto metaServerContext = - meta::TestUtils::mockMetaServer(localMetaPort, metaPath.c_str(), kClusterId); - localMetaPort = metaServerContext->port_; - - auto adminClient = std::make_unique(metaServerContext->kvStore_.get()); - Balancer balancer(metaServerContext->kvStore_.get(), std::move(adminClient)); - - auto threadPool = std::make_shared(10); - std::vector metaAddr = {HostAddr(localIp, localMetaPort)}; - - LOG(INFO) << "Create meta client..."; - auto mClient = std::make_unique(threadPool, metaAddr); - - mClient->waitForMetadReady(); - - int partition = 9; - int replica = 3; - std::vector peers; - std::vector storagePorts; - std::vector> metaClients; - - std::vector> serverContexts; - for (int i = 0; i < replica; i++) { - uint32_t storagePort = network::NetworkUtils::getAvailablePort(); - HostAddr storageAddr(localIp, storagePort); - storagePorts.emplace_back(storagePort); - peers.emplace_back(storageAddr); - - VLOG(1) << "The storage server has been added to the meta service"; - MetaClientOptions options; - options.localHost_ = storageAddr; - options.clusterId_ = kClusterId; - options.role_ = meta::cpp2::HostRole::STORAGE; - auto metaClient = std::make_shared(threadPool, metaAddr, options); - metaClient->waitForMetadReady(); - metaClients.emplace_back(metaClient); - } - - for (int i = 0; i < replica; i++) { - std::string dataPath = folly::stringPrintf("%s/%d/data", rootPath.path(), i); - auto sc = storage::TestUtils::mockStorageServer( - metaClients[i].get(), dataPath.c_str(), localIp, storagePorts[i], true); - serverContexts.emplace_back(std::move(sc)); - } - - SpaceDesc spaceDesc("storage", partition, replica); - auto ret = mClient->createSpace(spaceDesc).get(); - ASSERT_TRUE(ret.ok()); - while (true) { - int totalLeaders = 0; - for (int i = 0; i < replica; i++) { - std::unordered_map> leaderIds; - totalLeaders += serverContexts[i]->kvStore_->allLeader(leaderIds); - } - if (totalLeaders == partition) { - break; - } - LOG(INFO) << "Waiting for leader election, current total leader number " << totalLeaders - << ", expected " << partition; - sleep(1); - } - - auto code = balancer.leaderBalance(); - ASSERT_EQ(code, cpp2::ErrorCode::SUCCEEDED); - - LOG(INFO) << "Waiting for the leader balance"; - sleep(FLAGS_raft_heartbeat_interval_secs + 1); - size_t leaderCount = 0; - for (int i = 0; i < replica; i++) { - std::unordered_map> leaderIds; - leaderCount += serverContexts[i]->kvStore_->allLeader(leaderIds); - } - EXPECT_EQ(9, leaderCount); - for (auto& c : metaClients) { - c->stop(); - } - serverContexts.clear(); - metaClients.clear(); -} - -} // namespace meta -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); -} diff --git a/src/mock/AdHocSchemaManager.h b/src/mock/AdHocSchemaManager.h index 5a93f8e9ca8..ab65158a3c4 100644 --- a/src/mock/AdHocSchemaManager.h +++ b/src/mock/AdHocSchemaManager.h @@ -11,7 +11,6 @@ #include "clients/meta/MetaClient.h" #include "common/meta/NebulaSchemaProvider.h" #include "common/meta/SchemaManager.h" -#include "common/meta/SchemaProviderIf.h" namespace nebula { namespace mock { diff --git a/src/storage/BaseProcessor-inl.h b/src/storage/BaseProcessor-inl.h index ceec732b686..256a1eaaa1d 100644 --- a/src/storage/BaseProcessor-inl.h +++ b/src/storage/BaseProcessor-inl.h @@ -172,7 +172,7 @@ StatusOr BaseProcessor::encodeRowVal(const meta::NebulaSchema template nebula::cpp2::ErrorCode BaseProcessor::checkStatType( - const meta::SchemaProviderIf::Field& field, cpp2::StatType statType) { + const meta::NebulaSchemaProvider::SchemaField& field, cpp2::StatType statType) { // todo(doodle): how to deal with nullable fields? For now, null add anything // is null, if there is even one null, the result will be invalid auto fType = field.type(); diff --git a/src/storage/BaseProcessor.h b/src/storage/BaseProcessor.h index 1721be49cf3..1bde2c396c6 100644 --- a/src/storage/BaseProcessor.h +++ b/src/storage/BaseProcessor.h @@ -130,7 +130,7 @@ class BaseProcessor { void handleAsync(GraphSpaceID spaceId, PartitionID partId, nebula::cpp2::ErrorCode code); - nebula::cpp2::ErrorCode checkStatType(const meta::SchemaProviderIf::Field& field, + nebula::cpp2::ErrorCode checkStatType(const meta::NebulaSchemaProvider::SchemaField& field, cpp2::StatType statType); StatusOr encodeRowVal(const meta::NebulaSchemaProvider* schema, diff --git a/src/storage/CommonUtils.cpp b/src/storage/CommonUtils.cpp index 6952497e24b..a9ea2059ba6 100644 --- a/src/storage/CommonUtils.cpp +++ b/src/storage/CommonUtils.cpp @@ -10,7 +10,7 @@ namespace nebula { namespace storage { -bool CommonUtils::checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, +bool CommonUtils::checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema, RowReaderWrapper* reader, const std::string& ttlCol, int64_t ttlDuration) { @@ -22,7 +22,7 @@ bool CommonUtils::checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, return checkDataExpiredForTTL(schema, v.value(), ttlCol, ttlDuration); } -bool CommonUtils::checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, +bool CommonUtils::checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema, const Value& v, const std::string& ttlCol, int64_t ttlDuration) { @@ -43,7 +43,7 @@ bool CommonUtils::checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, } std::pair> CommonUtils::ttlProps( - const meta::SchemaProviderIf* schema) { + const meta::NebulaSchemaProvider* schema) { DCHECK(schema != nullptr); const auto* ns = dynamic_cast(schema); const auto sp = ns->getProp(); @@ -58,7 +58,7 @@ std::pair> CommonUtils::ttlProps( return std::make_pair(!(duration <= 0 || col.empty()), std::make_pair(duration, col)); } -StatusOr CommonUtils::ttlValue(const meta::SchemaProviderIf* schema, +StatusOr CommonUtils::ttlValue(const meta::NebulaSchemaProvider* schema, RowReaderWrapper* reader) { DCHECK(schema != nullptr); const auto* ns = dynamic_cast(schema); diff --git a/src/storage/CommonUtils.h b/src/storage/CommonUtils.h index 8598278bae6..5d8c726bcec 100644 --- a/src/storage/CommonUtils.h +++ b/src/storage/CommonUtils.h @@ -260,20 +260,21 @@ class CommonUtils final { * @param ttlDuration Ttl property duration * @return Whether data is expired */ - static bool checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, + static bool checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema, RowReaderWrapper* reader, const std::string& ttlCol, int64_t ttlDuration); - static bool checkDataExpiredForTTL(const meta::SchemaProviderIf* schema, + static bool checkDataExpiredForTTL(const meta::NebulaSchemaProvider* schema, const Value& v, const std::string& ttlCol, int64_t ttlDuration); static std::pair> ttlProps( - const meta::SchemaProviderIf* schema); + const meta::NebulaSchemaProvider* schema); - static StatusOr ttlValue(const meta::SchemaProviderIf* schema, RowReaderWrapper* reader); + static StatusOr ttlValue(const meta::NebulaSchemaProvider* schema, + RowReaderWrapper* reader); }; } // namespace storage diff --git a/src/storage/CompactionFilter.h b/src/storage/CompactionFilter.h index 96fb128006d..253b82414d4 100644 --- a/src/storage/CompactionFilter.h +++ b/src/storage/CompactionFilter.h @@ -106,7 +106,8 @@ class StorageCompactionFilter final : public kvstore::KVFilter { } // TODO(panda) Optimize the method in the future - bool ttlExpired(const meta::SchemaProviderIf* schema, nebula::RowReaderWrapper* reader) const { + bool ttlExpired(const meta::NebulaSchemaProvider* schema, + nebula::RowReaderWrapper* reader) const { if (schema == nullptr) { return true; } @@ -120,7 +121,7 @@ class StorageCompactionFilter final : public kvstore::KVFilter { return CommonUtils::checkDataExpiredForTTL(schema, reader, ttl.second.second, ttl.second.first); } - bool ttlExpired(const meta::SchemaProviderIf* schema, const Value& v) const { + bool ttlExpired(const meta::NebulaSchemaProvider* schema, const Value& v) const { if (schema == nullptr) { return true; } diff --git a/src/storage/exec/IndexScanNode.cpp b/src/storage/exec/IndexScanNode.cpp index 9e75d184cd4..4742d8a1c20 100644 --- a/src/storage/exec/IndexScanNode.cpp +++ b/src/storage/exec/IndexScanNode.cpp @@ -8,7 +8,7 @@ namespace nebula { namespace storage { // Define of Path Path::Path(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen) : index_(index), schema_(schema), hints_(hints) { @@ -38,7 +38,7 @@ Path::Path(nebula::meta::cpp2::IndexItem* index, } std::unique_ptr Path::make(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen) { std::unique_ptr ret; @@ -110,7 +110,7 @@ const std::string& Path::toString() { // Define of RangePath RangePath::RangePath(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen) : Path(index, schema, hints, vidLen) { @@ -307,7 +307,7 @@ std::string RangePath::encodeFloat(const Value& value, bool& isNaN) { // Define of PrefixPath PrefixPath::PrefixPath(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen) : Path(index, schema, hints, vidLen) { diff --git a/src/storage/exec/IndexScanNode.h b/src/storage/exec/IndexScanNode.h index c271d6b8364..c47d7672146 100644 --- a/src/storage/exec/IndexScanNode.h +++ b/src/storage/exec/IndexScanNode.h @@ -352,7 +352,7 @@ class Path { public: using ColumnTypeDef = ::nebula::meta::cpp2::ColumnTypeDef; Path(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen); virtual ~Path() = default; @@ -367,7 +367,7 @@ class Path { * @return std::unique_ptr */ static std::unique_ptr make(::nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen); @@ -437,7 +437,7 @@ class Path { /** * @brief tag/edge schema of current path */ - const meta::SchemaProviderIf* schema_; + const meta::NebulaSchemaProvider* schema_; /** * @brief IndexColumnHints of current path */ @@ -485,7 +485,7 @@ class PrefixPath : public Path { * @see Path */ PrefixPath(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen); @@ -528,7 +528,7 @@ class RangePath : public Path { * @see Path */ RangePath(nebula::meta::cpp2::IndexItem* index, - const meta::SchemaProviderIf* schema, + const meta::NebulaSchemaProvider* schema, const std::vector& hints, int64_t vidLen); QualifiedStrategy::Result qualified(const Map& rowData) override; diff --git a/src/storage/exec/QueryUtils.h b/src/storage/exec/QueryUtils.h index 81d18f6c9bf..55b04f83c1a 100644 --- a/src/storage/exec/QueryUtils.h +++ b/src/storage/exec/QueryUtils.h @@ -74,7 +74,7 @@ class QueryUtils final { */ static StatusOr readValue(RowReaderWrapper* reader, const std::string& propName, - const meta::SchemaProviderIf::Field* field) { + const meta::NebulaSchemaProvider::SchemaField* field) { auto value = reader->getValueByName(propName); if (value.type() == Value::Type::NULLVALUE) { // read null value @@ -120,7 +120,7 @@ class QueryUtils final { */ static StatusOr readValue(RowReaderWrapper* reader, const std::string& propName, - const meta::SchemaProviderIf* schema) { + const meta::NebulaSchemaProvider* schema) { auto field = schema->field(propName); if (!field) { return Status::Error(folly::stringPrintf("Fail to read prop %s ", propName.c_str())); diff --git a/src/storage/exec/UpdateNode.h b/src/storage/exec/UpdateNode.h index 675fb655b26..ec6765c68f3 100644 --- a/src/storage/exec/UpdateNode.h +++ b/src/storage/exec/UpdateNode.h @@ -65,7 +65,7 @@ class UpdateNode : public RelNode { /** * @brief Check if Field exists */ - nebula::cpp2::ErrorCode checkField(const meta::SchemaProviderIf::Field* field) { + nebula::cpp2::ErrorCode checkField(const meta::NebulaSchemaProvider::SchemaField* field) { if (!field) { VLOG(1) << "Fail to read prop"; if (isEdge_) { @@ -84,8 +84,8 @@ class UpdateNode : public RelNode { * @return E_INVALID_FIELD_VALUE if the field can't be null and doesn't have default value, else * SUCCEEDED. */ - nebula::cpp2::ErrorCode getDefaultOrNullValue(const meta::SchemaProviderIf::Field* field, - const std::string& name) { + nebula::cpp2::ErrorCode getDefaultOrNullValue( + const meta::NebulaSchemaProvider::SchemaField* field, const std::string& name) { if (field->hasDefault()) { ObjectPool pool; auto& exprStr = field->defaultValue(); diff --git a/src/storage/mutate/AddEdgesProcessor.cpp b/src/storage/mutate/AddEdgesProcessor.cpp index 84241955b9b..32c59b8cba9 100644 --- a/src/storage/mutate/AddEdgesProcessor.cpp +++ b/src/storage/mutate/AddEdgesProcessor.cpp @@ -367,7 +367,7 @@ std::vector AddEdgesProcessor::indexKeys( RowReaderWrapper* reader, const folly::StringPiece& rawKey, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema) { + const meta::NebulaSchemaProvider* latestSchema) { auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); if (!values.ok()) { return {}; diff --git a/src/storage/mutate/AddEdgesProcessor.h b/src/storage/mutate/AddEdgesProcessor.h index 234a609d987..d9c18ffd40d 100644 --- a/src/storage/mutate/AddEdgesProcessor.h +++ b/src/storage/mutate/AddEdgesProcessor.h @@ -48,7 +48,7 @@ class AddEdgesProcessor : public BaseProcessor { RowReaderWrapper* reader, const folly::StringPiece& rawKey, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema); + const meta::NebulaSchemaProvider* latestSchema); nebula::cpp2::ErrorCode deleteDupEdge(std::vector& edges); diff --git a/src/storage/mutate/AddVerticesProcessor.cpp b/src/storage/mutate/AddVerticesProcessor.cpp index 8bedad221c2..8e8e380506e 100644 --- a/src/storage/mutate/AddVerticesProcessor.cpp +++ b/src/storage/mutate/AddVerticesProcessor.cpp @@ -352,7 +352,7 @@ std::vector AddVerticesProcessor::indexKeys( const VertexID& vId, RowReaderWrapper* reader, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema) { + const meta::NebulaSchemaProvider* latestSchema) { auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); if (!values.ok()) { return {}; diff --git a/src/storage/mutate/AddVerticesProcessor.h b/src/storage/mutate/AddVerticesProcessor.h index deb275595e2..1b0230cf674 100644 --- a/src/storage/mutate/AddVerticesProcessor.h +++ b/src/storage/mutate/AddVerticesProcessor.h @@ -42,7 +42,7 @@ class AddVerticesProcessor : public BaseProcessor { const VertexID& vId, RowReaderWrapper* reader, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema); + const meta::NebulaSchemaProvider* latestSchema); void deleteDupVid(std::vector& vertices); diff --git a/src/storage/query/GetNeighborsProcessor.cpp b/src/storage/query/GetNeighborsProcessor.cpp index b362ee6d60a..89f49e0c59a 100644 --- a/src/storage/query/GetNeighborsProcessor.cpp +++ b/src/storage/query/GetNeighborsProcessor.cpp @@ -452,7 +452,7 @@ nebula::cpp2::ErrorCode GetNeighborsProcessor::handleEdgeStatProps( CHECK(!iter->second.empty()); const auto& edgeSchema = iter->second.back(); - const meta::SchemaProviderIf::Field* field = nullptr; + const meta::NebulaSchemaProvider::SchemaField* field = nullptr; if (exp->kind() == Expression::Kind::kEdgeProperty) { field = edgeSchema->field(propName); if (field == nullptr) { diff --git a/src/storage/query/QueryBaseProcessor-inl.h b/src/storage/query/QueryBaseProcessor-inl.h index 23592ef9fa8..aca914bb9d2 100644 --- a/src/storage/query/QueryBaseProcessor-inl.h +++ b/src/storage/query/QueryBaseProcessor-inl.h @@ -118,7 +118,7 @@ template void QueryBaseProcessor::addReturnPropContext( std::vector& ctxs, const char* propName, - const meta::SchemaProviderIf::Field* field) { + const meta::NebulaSchemaProvider::SchemaField* field) { PropContext ctx(propName); ctx.returned_ = true; ctx.field_ = field; @@ -553,7 +553,7 @@ nebula::cpp2::ErrorCode QueryBaseProcessor::checkExp( return nebula::cpp2::ErrorCode::SUCCEEDED; } - const meta::SchemaProviderIf::Field* field = nullptr; + const meta::NebulaSchemaProvider::SchemaField* field = nullptr; if (exp->kind() == Expression::Kind::kEdgeProperty) { field = edgeSchema->field(propName); // Noexistent property will return Empty or Null if enabled @@ -638,7 +638,7 @@ void QueryBaseProcessor::addPropContextIfNotExists( int32_t entryId, const std::string& entryName, const std::string& propName, - const meta::SchemaProviderIf::Field* field, + const meta::NebulaSchemaProvider::SchemaField* field, bool returned, bool filtered, const std::pair* statInfo) { diff --git a/src/storage/query/QueryBaseProcessor.h b/src/storage/query/QueryBaseProcessor.h index 07797523dcd..42b02e81b58 100644 --- a/src/storage/query/QueryBaseProcessor.h +++ b/src/storage/query/QueryBaseProcessor.h @@ -53,7 +53,7 @@ struct PropContext { } PropContext(const char* name, - const meta::SchemaProviderIf::Field* field, + const meta::NebulaSchemaProvider::SchemaField* field, bool returned, bool filtered, const std::pair* statInfo = nullptr) @@ -98,7 +98,7 @@ struct PropContext { // prop name std::string name_; // field info, e.g. nullable, default value - const meta::SchemaProviderIf::Field* field_; + const meta::NebulaSchemaProvider::SchemaField* field_; bool returned_ = false; bool filtered_ = false; // prop type in edge key, for srcId/dstId/type/rank @@ -190,7 +190,7 @@ class QueryBaseProcessor : public BaseProcessor { void addReturnPropContext(std::vector& ctxs, const char* propName, - const meta::SchemaProviderIf::Field* field); + const meta::NebulaSchemaProvider::SchemaField* field); void addPropContextIfNotExists(std::vector>>& props, std::unordered_map& indexMap, @@ -198,7 +198,7 @@ class QueryBaseProcessor : public BaseProcessor { int32_t entryId, const std::string& entryName, const std::string& propName, - const meta::SchemaProviderIf::Field* field, + const meta::NebulaSchemaProvider::SchemaField* field, bool returned, bool filtered, const std::pair* statInfo = nullptr); diff --git a/src/storage/test/CMakeLists.txt b/src/storage/test/CMakeLists.txt index 1e24bf93db3..4838f8ff56e 100644 --- a/src/storage/test/CMakeLists.txt +++ b/src/storage/test/CMakeLists.txt @@ -405,7 +405,6 @@ nebula_add_executable( ../../codec/test/RowWriterV1.cpp OBJECTS ${storage_test_deps} - $ LIBRARIES ${ROCKSDB_LIBRARIES} ${THRIFT_LIBRARIES} diff --git a/src/storage/test/QueryStatsTest.cpp b/src/storage/test/QueryStatsTest.cpp deleted file mode 100644 index aa4e878aead..00000000000 --- a/src/storage/test/QueryStatsTest.cpp +++ /dev/null @@ -1,149 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include -#include - -#include "codec/RowReaderWrapper.h" -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/utils/NebulaKeyUtils.h" -#include "storage/query/QueryStatsProcessor.h" -#include "storage/test/TestUtils.h" - -namespace nebula { -namespace storage { - -void mockData(kvstore::KVStore* kv) { - for (int32_t partId = 0; partId < 3; partId++) { - std::vector data; - for (int32_t vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { - for (int32_t tagId = 3001; tagId < 3010; tagId++) { - auto key = NebulaKeyUtils::tagKey(partId, vertexId, tagId, 0); - auto val = TestUtils::setupEncode(3, 6); - data.emplace_back(std::move(key), std::move(val)); - } - // Generate 7 edges for each edgeType. - for (int32_t dstId = 10001; dstId <= 10007; dstId++) { - VLOG(3) << "Write part " << partId << ", vertex " << vertexId << ", dst " << dstId; - auto key = NebulaKeyUtils::edgeKey(partId, vertexId, 101, dstId - 10001, dstId); - auto val = TestUtils::setupEncode(10, 20); - data.emplace_back(std::move(key), std::move(val)); - } - } - folly::Baton baton; - kv->asyncMultiPut(0, partId, std::move(data), [&](nebula::cpp2::ErrorCode code) { - EXPECT_EQ(code, nebula::cpp2::ErrorCode::SUCCEEDED); - baton.post(); - }); - baton.wait(); - } -} - -void buildRequest(cpp2::GetNeighborsRequest& req) { - req.set_space_id(0); - decltype(req.parts) tmpIds; - for (auto partId = 0; partId < 3; partId++) { - for (auto vertexId = partId * 10; vertexId < (partId + 1) * 10; vertexId++) { - tmpIds[partId].emplace_back(vertexId); - } - } - req.set_parts(std::move(tmpIds)); - std::vector et = {101}; - req.set_edge_types(et); - // Return tag props col_0, col_2, col_4 - decltype(req.return_columns) tmpColumns; - for (int i = 0; i < 2; i++) { - tmpColumns.emplace_back( - TestUtils::vertexPropDef(folly::stringPrintf("tag_%d_col_%d", 3001 + i * 2, i * 2), - cpp2::StatType::AVG, - 3001 + i * 2)); - } - // Return edge props col_0, col_2, col_4 ... col_18 - for (int i = 0; i < 5; i++) { - tmpColumns.emplace_back( - TestUtils::edgePropDef(folly::stringPrintf("col_%d", i * 2), cpp2::StatType::SUM, 101)); - } - req.set_return_columns(std::move(tmpColumns)); -} - -void checkResponse(const cpp2::QueryStatsResponse& resp) { - EXPECT_EQ(0, resp.result.failed_codes.size()); - - EXPECT_EQ(7, resp.schema.columns.size()); - CHECK_GT(resp.data.size(), 0); - auto provider = std::make_shared(resp.schema); - LOG(INFO) << "Check edge props..."; - - std::vector> expected; - expected.emplace_back("tag_3001_col_0", nebula::cpp2::SupportedType::DOUBLE, 0); - expected.emplace_back("tag_3003_col_2", nebula::cpp2::SupportedType::DOUBLE, 2); - expected.emplace_back("col_0", nebula::cpp2::SupportedType::INT, 0); - expected.emplace_back("col_2", nebula::cpp2::SupportedType::INT, 2); - expected.emplace_back("col_4", nebula::cpp2::SupportedType::INT, 4); - expected.emplace_back("col_6", nebula::cpp2::SupportedType::INT, 6); - expected.emplace_back("col_8", nebula::cpp2::SupportedType::INT, 8); - - auto reader = RowReaderWrapper::getRowReader(resp.data, provider); - auto numFields = provider->getNumFields(); - for (size_t i = 0; i < numFields; i++) { - const auto* name = provider->getFieldName(i); - const auto& ftype = provider->getFieldType(i); - EXPECT_EQ(name, std::get<0>(expected[i])); - EXPECT_TRUE(ftype.type == std::get<1>(expected[i])); - switch (ftype.type) { - case nebula::cpp2::SupportedType::INT: { - int64_t v; - auto ret = reader->getInt(i, v); - EXPECT_EQ(ret, ResultType::SUCCEEDED); - EXPECT_EQ(std::get<2>(expected[i]) * 210, v); - break; - } - case nebula::cpp2::SupportedType::DOUBLE: { - float v; - auto ret = reader->getFloat(i, v); - EXPECT_EQ(ret, ResultType::SUCCEEDED); - EXPECT_EQ(std::get<2>(expected[i]), v); - break; - } - default: { - LOG(FATAL) << "Should not reach here!"; - break; - } - } - } -} - -TEST(QueryStatsTest, StatsSimpleTest) { - fs::TempDir rootPath("/tmp/QueryStatsTest.XXXXXX"); - std::unique_ptr kv = TestUtils::initKV(rootPath.path()); - - LOG(INFO) << "Prepare meta..."; - auto schemaMan = TestUtils::mockSchemaMan(); - mockData(kv.get()); - - cpp2::GetNeighborsRequest req; - buildRequest(req); - - auto executor = std::make_unique(3); - auto* processor = - QueryStatsProcessor::instance(kv.get(), schemaMan.get(), nullptr, executor.get()); - auto f = processor->getFuture(); - processor->process(req); - auto resp = std::move(f).get(); - - checkResponse(resp); -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - - return RUN_ALL_TESTS(); -} diff --git a/src/storage/test/StorageClientTest.cpp b/src/storage/test/StorageClientTest.cpp deleted file mode 100644 index 5951e9a1e58..00000000000 --- a/src/storage/test/StorageClientTest.cpp +++ /dev/null @@ -1,398 +0,0 @@ -/* Copyright (c) 2018 vesoft inc. All rights reserved. - * - * This source code is licensed under Apache 2.0 License. - */ - -#include - -#include "clients/storage/StorageClient.h" -#include "codec/RowReaderWrapper.h" -#include "common/base/Base.h" -#include "common/fs/TempDir.h" -#include "common/network/NetworkUtils.h" -#include "meta/test/TestUtils.h" -#include "storage/test/TestUtils.h" - -DECLARE_string(meta_server_addrs); -DECLARE_int32(heartbeat_interval_secs); - -namespace nebula { -namespace storage { - -TEST(StorageClientTest, VerticesInterfacesTest) { - FLAGS_heartbeat_interval_secs = 1; - const nebula::ClusterID kClusterId = 10; - fs::TempDir rootPath("/tmp/StorageClientTest.XXXXXX"); - GraphSpaceID spaceId = 0; - IPv4 localIp; - network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - - // Let the system choose an available port for us - uint32_t localMetaPort = network::NetworkUtils::getAvailablePort(); - LOG(INFO) << "Start meta server...."; - std::string metaPath = folly::stringPrintf("%s/meta", rootPath.path()); - auto metaServerContext = - meta::TestUtils::mockMetaServer(localMetaPort, metaPath.c_str(), kClusterId); - localMetaPort = metaServerContext->port_; - - LOG(INFO) << "Create meta client..."; - auto threadPool = std::make_shared(1); - auto addrsRet = - network::NetworkUtils::toHosts(folly::stringPrintf("127.0.0.1:%d", localMetaPort)); - CHECK(addrsRet.ok()) << addrsRet.status(); - auto& addrs = addrsRet.value(); - uint32_t localDataPort = network::NetworkUtils::getAvailablePort(); - auto hostRet = nebula::network::NetworkUtils::toHostAddr("127.0.0.1", localDataPort); - auto& localHost = hostRet.value(); - meta::MetaClientOptions options; - options.localHost_ = localHost; - options.clusterId_ = kClusterId; - options.role_ = meta::cpp2::HostRole::STORAGE; - auto mClient = std::make_unique(threadPool, std::move(addrs), options); - LOG(INFO) << "Add hosts automatically and create space...."; - mClient->waitForMetadReady(); - VLOG(1) << "The storage server has been added to the meta service"; - - LOG(INFO) << "Start data server...."; - - // for mockStorageServer MetaServerBasedPartManager, use ephemeral port - std::string dataPath = folly::stringPrintf("%s/data", rootPath.path()); - auto sc = TestUtils::mockStorageServer(mClient.get(), - dataPath.c_str(), - localIp, - localDataPort, - // TODO We are using the memory version - // of SchemaMan We need to switch to - // Meta Server based version - false); - meta::SpaceDesc spaceDesc("default", 10, 1); - auto ret = mClient->createSpace(spaceDesc).get(); - ASSERT_TRUE(ret.ok()) << ret.status(); - spaceId = ret.value(); - LOG(INFO) << "Created space \"default\", its id is " << spaceId; - sleep(FLAGS_heartbeat_interval_secs + 1); - TestUtils::waitUntilAllElected(sc->kvStore_.get(), spaceId, 10); - auto client = std::make_unique(threadPool, mClient.get()); - - // VerticesInterfacesTest(addVertices and getVertexProps) - { - LOG(INFO) << "Prepare vertices data..."; - std::vector vertices; - for (int64_t vId = 0; vId < 10; vId++) { - cpp2::Vertex v; - v.set_id(vId); - decltype(v.tags) tags; - for (int32_t tagId = 3001; tagId < 3010; tagId++) { - cpp2::Tag t; - t.set_tag_id(tagId); - // Generate some tag props. - auto val = TestUtils::setupEncode(); - t.set_props(std::move(val)); - tags.emplace_back(std::move(t)); - } - v.set_tags(std::move(tags)); - vertices.emplace_back(std::move(v)); - } - auto f = client->addVertices(spaceId, std::move(vertices), true); - LOG(INFO) << "Waiting for the response..."; - auto resp = std::move(f).get(); - if (!resp.succeeded()) { - for (auto& err : resp.failedParts()) { - LOG(ERROR) << "Partition " << err.first - << " failed: " << apache::thrift::util::enumNameSafe(err.second); - } - ASSERT_TRUE(resp.succeeded()); - } - } - - { - std::vector vIds; - std::vector retCols; - for (int32_t vId = 0; vId < 10; vId++) { - vIds.emplace_back(vId); - } - for (int i = 0; i < 3; i++) { - retCols.emplace_back(TestUtils::vertexPropDef( - folly::stringPrintf("tag_%d_col_%d", 3001 + i * 2, i * 2), 3001 + i * 2)); - } - auto f = client->getVertexProps(spaceId, std::move(vIds), std::move(retCols)); - auto resp = std::move(f).get(); - if (VLOG_IS_ON(2)) { - if (!resp.succeeded()) { - std::stringstream ss; - for (auto& p : resp.failedParts()) { - ss << "Part " << p.first << ": " << apache::thrift::util::enumNameSafe(p.second) << "; "; - } - VLOG(2) << "Failed partitions:: " << ss.str(); - } - } - ASSERT_TRUE(resp.succeeded()); - - auto& results = resp.responses(); - ASSERT_EQ(1, results.size()); - EXPECT_EQ(0, results[0].result.failed_codes.size()); - - EXPECT_EQ(10, results[0].vertices.size()); - - auto* vschema = results[0].get_vertex_schema(); - DCHECK(vschema != nullptr); - for (auto& vp : results[0].vertices) { - auto size = std::accumulate( - vp.tag_data.cbegin(), vp.tag_data.cend(), 0, [vschema](int acc, auto& td) { - auto it = vschema->find(td.tag_id); - DCHECK(it != vschema->end()); - return acc + it->second.columns.size(); - }); - - EXPECT_EQ(3, size); - - checkTagData(vp.tag_data, 3001, "tag_3001_col_0", vschema, 0); - checkTagData(vp.tag_data, 3003, "tag_3003_col_2", vschema, 2); - checkTagData( - vp.tag_data, 3005, "tag_3005_col_4", vschema, folly::stringPrintf("string_col_4")); - } - } - - // EdgesInterfacesTest(addEdges and getEdgeProps) - { - LOG(INFO) << "Prepare edges data..."; - std::vector edges; - for (int64_t srcId = 0; srcId < 10; srcId++) { - cpp2::Edge edge; - // Set the edge key. - decltype(edge.key) edgeKey; - edgeKey.set_src(srcId); - edgeKey.set_edge_type(101); - edgeKey.set_dst(srcId * 100 + 2); - edgeKey.set_ranking(srcId * 100 + 3); - edge.set_key(std::move(edgeKey)); - // Generate some edge props. - auto val = TestUtils::setupEncode(10, 20); - edge.set_props(std::move(val)); - edges.emplace_back(std::move(edge)); - } - auto f = client->addEdges(spaceId, std::move(edges), true); - LOG(INFO) << "Waiting for the response..."; - auto resp = std::move(f).get(); - ASSERT_TRUE(resp.succeeded()); - } - - { - std::vector edgeKeys; - std::vector retCols; - for (int64_t srcId = 0; srcId < 10; srcId++) { - // Set the edge key. - cpp2::EdgeKey edgeKey; - edgeKey.set_src(srcId); - edgeKey.set_edge_type(101); - edgeKey.set_dst(srcId * 100 + 2); - edgeKey.set_ranking(srcId * 100 + 3); - edgeKeys.emplace_back(std::move(edgeKey)); - } - for (int i = 0; i < 20; i++) { - retCols.emplace_back(TestUtils::edgePropDef(folly::stringPrintf("col_%d", i), 101)); - } - auto f = client->getEdgeProps(spaceId, std::move(edgeKeys), std::move(retCols)); - auto resp = std::move(f).get(); - ASSERT_TRUE(resp.succeeded()); - - auto& results = resp.responses(); - ASSERT_EQ(1, results.size()); - EXPECT_EQ(0, results[0].result.failed_codes.size()); - EXPECT_EQ(4 + 20, results[0].schema.columns.size()); - - auto edgeProvider = std::make_shared(results[0].schema); - RowSetReader rsReader(edgeProvider, results[0].data); - auto it = rsReader.begin(); - while (it) { - EXPECT_EQ(4 + 20, it->numFields()); - auto fieldIt = it->begin(); - int index = 0; - while (fieldIt) { - if (index < 4) { // _src | _rank | _dst - int64_t vid; - EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getVid(vid)); - } else if (index >= 14) { // the last 10 STRING fields - folly::StringPiece stringCol; - EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getString(stringCol)); - EXPECT_EQ(folly::stringPrintf("string_col_%d", index - 4), stringCol); - } else { // the middle 10 INT fields - int32_t intCol; - EXPECT_EQ(ResultType::SUCCEEDED, fieldIt->getInt(intCol)); - EXPECT_EQ(index - 4, intCol); - } - ++index; - ++fieldIt; - } - EXPECT_EQ(fieldIt, it->end()); - ++it; - } - EXPECT_EQ(it, rsReader.end()); - } - { - std::unordered_map> edgeKeys; - std::vector vertices; - for (int64_t srcId = 0; srcId < 10; srcId++) { - vertices.emplace_back(srcId); - } - - // Delete all edges of a vertex - { - for (int64_t srcId = 0; srcId < 10; srcId++) { - std::vector keys; - cpp2::EdgeKey key; - key.set_src(srcId); - key.set_edge_type(101); - key.set_ranking(srcId * 100 + 3); - key.set_dst(srcId * 100 + 2); - keys.emplace_back(std::move(key)); - auto f = client->deleteEdges(spaceId, keys); - auto resp = std::move(f).get(); - ASSERT_TRUE(resp.succeeded()); - for (auto& response : std::move(resp).responses()) { - ASSERT_EQ(0, response.get_result().get_failed_codes().size()); - } - } - } - // Delete a vertex - { - auto f = client->deleteVertices(spaceId, vertices); - auto resp = std::move(f).get(); - ASSERT_TRUE(resp.succeeded()); - auto responses = std::move(resp).responses(); - for (auto& response : responses) { - ASSERT_EQ(0, response.get_result().get_failed_codes().size()); - } - - // Check that this vertex has been successfully deleted - std::vector retCols; - retCols.emplace_back( - TestUtils::vertexPropDef(folly::stringPrintf("tag_%d_col_%d", 3001, 0), 3001)); - auto cf = client->getVertexProps(spaceId, std::move(vertices), std::move(retCols)); - auto cresp = std::move(cf).get(); - ASSERT_TRUE(cresp.succeeded()); - auto& results = cresp.responses(); - ASSERT_EQ(1, results.size()); - EXPECT_EQ(0, results[0].result.failed_codes.size()); - for (auto vertex : results[0].vertices) { - EXPECT_EQ(0, vertex.tag_data.size()); - } - } - } - - { - // get not existed uuid - std::vector vIds; - for (int i = 0; i < 10; i++) { - auto status = client->getUUID(spaceId, std::to_string(i)).get(); - ASSERT_TRUE(status.ok()); - auto resp = status.value(); - vIds.emplace_back(resp.get_id()); - } - - for (int i = 0; i < 10; i++) { - auto status = client->getUUID(spaceId, std::to_string(i)).get(); - ASSERT_TRUE(status.ok()); - auto resp = status.value(); - ASSERT_EQ(resp.get_id(), vIds[i]); - } - } - LOG(INFO) << "Stop meta client"; - mClient->stop(); - LOG(INFO) << "Stop data server..."; - sc.reset(); - LOG(INFO) << "Stop data client..."; - client.reset(); - LOG(INFO) << "Stop meta server..."; - metaServerContext.reset(); - threadPool.reset(); -} - -#define RETURN_LEADER_CHANGED(req, leader) \ - UNUSED(req); \ - do { \ - folly::Promise pro; \ - auto f = pro.getFuture(); \ - storage::cpp2::QueryResponse resp; \ - storage::cpp2::ResponseCommon rc; \ - rc.failed_codes.emplace_back(); \ - auto& code = rc.failed_codes.back(); \ - code.set_part_id(1); \ - code.set_code(nebula::cpp2::ErrorCode::E_LEADER_CHANGED); \ - code.set_leader(leader); \ - resp.set_result(std::move(rc)); \ - pro.setValue(std::move(resp)); \ - return f; \ - } while (false) - -class TestStorageServiceRetry : public storage::cpp2::StorageServiceSvIf { - public: - TestStorageServiceRetry(IPv4 ip, Port port) { - leader_.set_ip(ip); - leader_.set_port(port); - } - - folly::Future future_getBound( - const cpp2::GetNeighborsRequest& req) override { - RETURN_LEADER_CHANGED(req, leader_); - } - - private: - nebula::cpp2::HostAddr leader_; -}; - -class TestStorageClient : public StorageClient { - public: - explicit TestStorageClient(std::shared_ptr ioThreadPool) - : StorageClient(ioThreadPool, nullptr) {} - - StatusOr partsNum(GraphSpaceID) const override { - return parts_.size(); - } - - StatusOr getPartMeta(GraphSpaceID, PartitionID partId) const override { - auto it = parts_.find(partId); - CHECK(it != parts_.end()); - return it->second; - } - - void loadLeader() const override {} - - std::unordered_map parts_; -}; - -TEST(StorageClientTest, LeaderChangeTest) { - IPv4 localIp; - network::NetworkUtils::ipv4ToInt("127.0.0.1", localIp); - - auto sc = std::make_unique(); - auto handler = std::make_shared(localIp, 10010); - sc->mockCommon("storage", 0, handler); - LOG(INFO) << "Start storage server on " << sc->port_; - - auto threadPool = std::make_shared(1); - TestStorageClient tsc(threadPool); - PartMeta pm; - pm.spaceId_ = 1; - pm.partId_ = 1; - pm.peers_.emplace_back(HostAddr(localIp, sc->port_)); - tsc.parts_.emplace(1, std::move(pm)); - - folly::Baton baton; - tsc.getNeighbors(1, {1, 2, 3}, {0}, "", {}).via(threadPool.get()).thenValue([&](auto&&) { - baton.post(); - }); - baton.wait(); - ASSERT_EQ(1, tsc.leaders_.size()); - ASSERT_EQ(HostAddr(localIp, 10010), tsc.leaders_[std::make_pair(1, 1)]); -} - -} // namespace storage -} // namespace nebula - -int main(int argc, char** argv) { - testing::InitGoogleTest(&argc, argv); - folly::init(&argc, &argv, true); - google::SetStderrLogging(google::INFO); - return RUN_ALL_TESTS(); -} diff --git a/src/tools/db-upgrade/DbUpgrader.cpp b/src/tools/db-upgrade/DbUpgrader.cpp index 820b86c9ff3..065d849d099 100644 --- a/src/tools/db-upgrade/DbUpgrader.cpp +++ b/src/tools/db-upgrade/DbUpgrader.cpp @@ -670,7 +670,7 @@ void UpgraderSpace::encodeVertexValue(PartitionID partId, // If the field types are inconsistent, can be converted WriteResult UpgraderSpace::convertValue(const meta::NebulaSchemaProvider* nSchema, - const meta::SchemaProviderIf* oSchema, + const meta::NebulaSchemaProvider* oSchema, std::string& name, Value& val) { auto newpropType = nSchema->getFieldType(name); @@ -1005,7 +1005,7 @@ std::vector UpgraderSpace::indexVertexKeys( VertexID& vId, RowReaderWrapper* reader, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema) { + const meta::NebulaSchemaProvider* latestSchema) { auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); if (!values.ok()) { return {}; @@ -1062,7 +1062,7 @@ std::vector UpgraderSpace::indexEdgeKeys( EdgeRanking rank, VertexID& dstId, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema) { + const meta::NebulaSchemaProvider* latestSchema) { auto values = IndexKeyUtils::collectIndexValues(reader, index.get(), latestSchema); if (!values.ok()) { return {}; diff --git a/src/tools/db-upgrade/DbUpgrader.h b/src/tools/db-upgrade/DbUpgrader.h index d1896e400e0..aefadcc56a6 100644 --- a/src/tools/db-upgrade/DbUpgrader.h +++ b/src/tools/db-upgrade/DbUpgrader.h @@ -88,7 +88,7 @@ class UpgraderSpace { VertexID& vId, RowReaderWrapper* reader, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema); + const meta::NebulaSchemaProvider* latestSchema); void encodeEdgeValue(PartitionID partId, RowReaderWrapper* reader, @@ -106,10 +106,10 @@ class UpgraderSpace { EdgeRanking rank, VertexID& dstId, std::shared_ptr index, - const meta::SchemaProviderIf* latestSchema); + const meta::NebulaSchemaProvider* latestSchema); WriteResult convertValue(const meta::NebulaSchemaProvider* newSchema, - const meta::SchemaProviderIf* oldSchema, + const meta::NebulaSchemaProvider* oldSchema, std::string& name, Value& val); void runPartV1();