diff --git a/src/kvstore/raftex/RaftPart.cpp b/src/kvstore/raftex/RaftPart.cpp index 79bd7af85d0..07a4064052a 100644 --- a/src/kvstore/raftex/RaftPart.cpp +++ b/src/kvstore/raftex/RaftPart.cpp @@ -697,19 +697,19 @@ folly::Future RaftPart::appendLogAsync(ClusterID source // until majority accept the logs, the leadership changes, or // the partition stops VLOG(2) << idStr_ << "Calling appendLogsInternal()"; - AppendLogsIterator it(firstId, - termId, - std::move(swappedOutLogs), - [this](AtomicOp opCB) -> folly::Optional { - CHECK(opCB != nullptr); - auto opRet = opCB(); - if (!opRet.hasValue()) { - // Failed - sendingPromise_.setOneSingleValue( - nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); + AppendLogsIterator it( + firstId, + termId, + std::move(swappedOutLogs), + [this](AtomicOp opCB) -> folly::Optional { + CHECK(opCB != nullptr); + auto opRet = opCB(); + if (!opRet.hasValue()) { + // Failed + sendingPromise_.setOneSingleValue(nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); appendLogsInternal(std::move(it), termId); return retFuture; @@ -964,19 +964,18 @@ void RaftPart::processAppendLogResponses(const AppendLogResponses& resps, // continue to replicate the logs sendingPromise_ = std::move(cachingPromise_); cachingPromise_.reset(); - iter = AppendLogsIterator( - firstLogId, - currTerm, - std::move(logs_), - [this](AtomicOp op) -> folly::Optional { - auto opRet = op(); - if (!opRet.hasValue()) { - // Failed - sendingPromise_.setOneSingleValue( - nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); - } - return opRet; - }); + iter = AppendLogsIterator(firstLogId, + currTerm, + std::move(logs_), + [this](AtomicOp op) -> folly::Optional { + auto opRet = op(); + if (!opRet.hasValue()) { + // Failed + sendingPromise_.setOneSingleValue( + nebula::cpp2::ErrorCode::E_RAFT_ATOMIC_OP_FAILED); + } + return opRet; + }); logs_.clear(); bufferOverFlow_ = false; } diff --git a/src/meta/processors/index/FTIndexProcessor.cpp b/src/meta/processors/index/FTIndexProcessor.cpp index 6928acd7844..3bf899d6280 100644 --- a/src/meta/processors/index/FTIndexProcessor.cpp +++ b/src/meta/processors/index/FTIndexProcessor.cpp @@ -100,7 +100,9 @@ void CreateFTIndexProcessor::process(const cpp2::CreateFTIndexReq& req) { onFinished(); return; } - if (index.get_depend_schema() == indexItem.get_depend_schema()) { + // Because tagId/edgeType is the space range, judge the spaceId and schemaId + if (index.get_space_id() == indexItem.get_space_id() && + index.get_depend_schema() == indexItem.get_depend_schema()) { LOG(ERROR) << "Depends on the same schema , index : " << indexName; handleErrorCode(nebula::cpp2::ErrorCode::E_EXISTED); onFinished(); diff --git a/src/meta/test/IndexProcessorTest.cpp b/src/meta/test/IndexProcessorTest.cpp index a6fdd518372..8fb05308872 100644 --- a/src/meta/test/IndexProcessorTest.cpp +++ b/src/meta/test/IndexProcessorTest.cpp @@ -1558,6 +1558,15 @@ void mockSchemas(kvstore::KVStore* kv) { schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(1, edgeType, ver), MetaKeyUtils::schemaVal("test_edge", srcsch)); + // space 2 + schemas.emplace_back(MetaKeyUtils::indexTagKey(2, "test_tag"), tagIdVal); + schemas.emplace_back(MetaKeyUtils::schemaTagKey(2, tagId, ver), + MetaKeyUtils::schemaVal("test_tag", srcsch)); + + schemas.emplace_back(MetaKeyUtils::indexEdgeKey(2, "test_edge"), edgeTypeVal); + schemas.emplace_back(MetaKeyUtils::schemaEdgeKey(2, edgeType, ver), + MetaKeyUtils::schemaVal("test_edge", srcsch)); + folly::Baton baton; kv->asyncMultiPut(0, 0, std::move(schemas), [&](nebula::cpp2::ErrorCode code) { ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, code); @@ -1570,6 +1579,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { fs::TempDir rootPath("/tmp/CreateFTIndexTest.XXXXXX"); auto kv = MockCluster::initMetaKV(rootPath.path()); TestUtils::assembleSpace(kv.get(), 1, 1); + TestUtils::assembleSpace(kv.get(), 2, 1, 1, 1, true); mockSchemas(kv.get()); for (auto id : {5, 6}) { // expected error. column col_fixed_string_2 is fixed_string, @@ -1627,7 +1637,7 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { } else { schemaId.edge_type_ref() = 6; } - index.space_id_ref() = 2; + index.space_id_ref() = 3; index.depend_schema_ref() = std::move(schemaId); index.fields_ref() = {"col_string"}; req.fulltext_index_name_ref() = "test_ft_index"; @@ -1825,6 +1835,63 @@ TEST(IndexProcessorTest, CreateFTIndexTest) { auto resp = std::move(f).get(); ASSERT_EQ(nebula::cpp2::ErrorCode::E_INDEX_NOT_FOUND, resp.get_code()); } + + // expected success + // Different spaces, the same tag name(same tagId), create full-text indexes with different names. + { + { + for (auto i = 0; i < 2; ++i) { + cpp2::CreateFTIndexReq req; + cpp2::FTIndex index; + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 5; + index.space_id_ref() = i + 1; + index.depend_schema_ref() = std::move(schemaId); + index.fields_ref() = {"col_string", "col_fixed_string_1"}; + req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1); + req.index_ref() = std::move(index); + + auto* processor = CreateFTIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + } + { + cpp2::ListFTIndexesReq req; + auto* processor = ListFTIndexesProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + auto indexes = resp.get_indexes(); + ASSERT_EQ(2, indexes.size()); + for (auto i = 0u; i < indexes.size(); ++i) { + auto key = folly::stringPrintf("ft_tag_index_space%u", i + 1); + auto iter = indexes.find(key); + ASSERT_NE(indexes.end(), iter); + std::vector fields = {"col_string", "col_fixed_string_1"}; + ASSERT_EQ(fields, iter->second.get_fields()); + ASSERT_EQ(i + 1, iter->second.get_space_id()); + nebula::cpp2::SchemaID schemaId; + schemaId.tag_id_ref() = 5; + ASSERT_EQ(schemaId, iter->second.get_depend_schema()); + } + } + { + for (auto i = 0; i < 2; ++i) { + cpp2::DropFTIndexReq req; + req.space_id_ref() = i + 1; + req.fulltext_index_name_ref() = folly::stringPrintf("ft_tag_index_space%d", i + 1); + auto* processor = DropFTIndexProcessor::instance(kv.get()); + auto f = processor->getFuture(); + processor->process(req); + auto resp = std::move(f).get(); + ASSERT_EQ(nebula::cpp2::ErrorCode::SUCCEEDED, resp.get_code()); + } + } + } } TEST(IndexProcessorTest, DropWithFTIndexTest) { @@ -2282,7 +2349,6 @@ TEST(ProcessorTest, IndexIdInSpaceRangeTest) { ASSERT_EQ(14, resp.get_id().get_index_id()); } } - } // namespace meta } // namespace nebula diff --git a/src/meta/test/TestUtils.h b/src/meta/test/TestUtils.h index ba95e4bcbf7..903840e34eb 100644 --- a/src/meta/test/TestUtils.h +++ b/src/meta/test/TestUtils.h @@ -175,16 +175,26 @@ class TestUtils { GraphSpaceID id, int32_t partitionNum, int32_t replica = 1, - int32_t totalHost = 1) { + int32_t totalHost = 1, + bool multispace = false) { // mock the part distribution like create space cpp2::SpaceDesc properties; - properties.space_name_ref() = "test_space"; + if (multispace) { + properties.space_name_ref() = folly::stringPrintf("test_space_%d", id); + } else { + properties.space_name_ref() = "test_space"; + } properties.partition_num_ref() = partitionNum; properties.replica_factor_ref() = replica; auto spaceVal = MetaKeyUtils::spaceVal(properties); std::vector data; - data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"), - std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + if (multispace) { + data.emplace_back(MetaKeyUtils::indexSpaceKey(folly::stringPrintf("test_space_%d", id)), + std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + } else { + data.emplace_back(MetaKeyUtils::indexSpaceKey("test_space"), + std::string(reinterpret_cast(&id), sizeof(GraphSpaceID))); + } data.emplace_back(MetaKeyUtils::spaceKey(id), MetaKeyUtils::spaceVal(properties)); std::vector allHosts;